• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python log.debug函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyon.public.log.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _expect_from_root

    def _expect_from_root(self, p_root):
        """
        Start an event subscriber to the given root platform.
        To be called before any action that triggers publications from the root
        platform. It sets self._wait_root_event to a function to be called to
        wait for the event.
        """
        async_result = AsyncResult()

        # subscribe:
        event_type = "DeviceAggregateStatusEvent"

        def consume_event(evt, *args, **kwargs):
            async_result.set(evt)

        sub = EventSubscriber(event_type=event_type,
                              origin=p_root.platform_device_id,
                              callback=consume_event)

        sub.start()
        self._data_subscribers.append(sub)
        sub._ready_event.wait(timeout=CFG.endpoint.receive.timeout)

        log.debug("registered for DeviceAggregateStatusEvent")

        # set new wait function:
        def wait():
            root_evt = async_result.get(timeout=CFG.endpoint.receive.timeout)
            return root_evt

        self._wait_root_event = wait
开发者ID:ednad,项目名称:coi-services,代码行数:31,代码来源:test_platform_status.py


示例2: _get_ports_using_agent_device_map

 def _get_ports_using_agent_device_map(self):
     ports = {}
     for port_id, port in self._nnode.ports.iteritems():
         ports[port_id] = {'comms': port.comms, 'attrs': port.attrs}
     log.debug("%r: _get_ports_using_agent_device_map: %s",
           self._platform_id, ports)
     return ports
开发者ID:swarbhanu,项目名称:coi-services,代码行数:7,代码来源:oms_platform_driver.py


示例3: create_salinity_doubler_data_process_definition

    def create_salinity_doubler_data_process_definition(self):

        #First look to see if it exists and if not, then create it
        dpd,_ = self.rrclient.find_resources(restype=RT.DataProcessDefinition, name='salinity_doubler')
        if len(dpd) > 0:
            return dpd[0]

        # Salinity Doubler: Data Process Definition
        log.debug("Create data process definition SalinityDoublerTransform")
        dpd_obj = IonObject(RT.DataProcessDefinition,
            name='salinity_doubler',
            description='create a salinity doubler data product',
            module='ion.processes.data.transforms.example_double_salinity',
            class_name='SalinityDoubler')
        try:
            salinity_doubler_dprocdef_id = self.dataprocessclient.create_data_process_definition(dpd_obj)
        except Exception as ex:
            self.fail("failed to create new SalinityDoubler data process definition: %s" %ex)


        # create a stream definition for the data from the salinity Transform
        ctd_pdict_id = self.datasetclient.read_parameter_dictionary_by_name('ctd_parsed_param_dict', id_only=True)
        salinity_double_stream_def_id = self.pubsubclient.create_stream_definition(name='SalinityDoubler', parameter_dictionary_id=ctd_pdict_id)
        self.dataprocessclient.assign_stream_definition_to_data_process_definition(salinity_double_stream_def_id, salinity_doubler_dprocdef_id, binding='salinity' )

        return salinity_doubler_dprocdef_id
开发者ID:blazetopher,项目名称:coi-services,代码行数:26,代码来源:test_helper.py


示例4: test_plot_1

def test_plot_1():
    from coverage_model.test.examples import SimplexCoverage
    import matplotlib.pyplot as plt

    cov=SimplexCoverage.load('test_data/usgs.cov')

    log.debug('Plot the \'water_temperature\' and \'streamflow\' for all times')
    wtemp = cov.get_parameter_values('water_temperature')
    wtemp_pc = cov.get_parameter_context('water_temperature')
    sflow = cov.get_parameter_values('streamflow')
    sflow_pc = cov.get_parameter_context('streamflow')
    times = cov.get_parameter_values('time')
    time_pc = cov.get_parameter_context('time')

    fig = plt.figure()
    ax1 = fig.add_subplot(2,1,1)
    ax1.plot(times,wtemp)
    ax1.set_xlabel('{0} ({1})'.format(time_pc.name, time_pc.uom))
    ax1.set_ylabel('{0} ({1})'.format(wtemp_pc.name, wtemp_pc.uom))

    ax2 = fig.add_subplot(2,1,2)
    ax2.plot(times,sflow)
    ax2.set_xlabel('{0} ({1})'.format(time_pc.name, time_pc.uom))
    ax2.set_ylabel('{0} ({1})'.format(sflow_pc.name, sflow_pc.uom))

    plt.show(0)
开发者ID:oceanzus,项目名称:coverage-model,代码行数:26,代码来源:examples.py


示例5: _build_network_definition_using_topology

    def _build_network_definition_using_topology(self):
        """
        Uses self._topology to build the network definition.
        """
        log.debug("%r: _build_network_definition_using_topology: %s",
            self._platform_id, self._topology)

        def build(platform_id, children):
            """
            Returns the root NNode for the given platform_id with its
            children according to the given list.
            """
            nnode = NNode(platform_id)
            if self._agent_device_map:
                self._set_attributes_and_ports_from_agent_device_map(nnode)

            log.debug('Created NNode for %r', platform_id)

            for subplatform_id in children:
                subplatform_children = self._topology.get(subplatform_id, [])
                sub_nnode = build(subplatform_id, subplatform_children)
                nnode.add_subplatform(sub_nnode)

            return nnode

        children = self._topology.get(self._platform_id, [])
        return build(self._platform_id, children)
开发者ID:swarbhanu,项目名称:coi-services,代码行数:27,代码来源:oms_platform_driver.py


示例6: _get_checksum

 def _get_checksum(self, platform_id):
     # get checksum from RSN OMS:
     res = self._rsn_oms.get_checksum(platform_id)
     checksum = res[platform_id]
     if log.isEnabledFor(logging.DEBUG):
         log.debug("_rsn_oms: checksum: %s", checksum)
     return checksum
开发者ID:blazetopher,项目名称:coi-services,代码行数:7,代码来源:test_oms_util.py


示例7: enqueue

 def enqueue(self, msg):
     """
     Enqueue message for transmission so server.
     """
     log.debug('Client enqueueing message: %s.', msg)
     self._queue.append(msg)
     return len(self._queue)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:7,代码来源:r3pc.py


示例8: _construct_fsm

    def _construct_fsm(self, states=PlatformDriverState,
                       events=PlatformDriverEvent,
                       enter_event=PlatformDriverEvent.ENTER,
                       exit_event=PlatformDriverEvent.EXIT):
        """
        Constructs the FSM for the driver. The preparations here are mostly
        related with the UNCONFIGURED, DISCONNECTED, and CONNECTED state
        transitions, with some common handlers for the CONNECTED state.
        Subclasses can override to indicate specific parameters and add new
        handlers (typically for the CONNECTED state).
        """
        log.debug("constructing base platform driver FSM")

        self._fsm = ThreadSafeFSM(states, events, enter_event, exit_event)

        for state in PlatformDriverState.list():
            self._fsm.add_handler(state, enter_event, self._common_state_enter)
            self._fsm.add_handler(state, exit_event, self._common_state_exit)

        # UNCONFIGURED state event handlers:
        self._fsm.add_handler(PlatformDriverState.UNCONFIGURED, PlatformDriverEvent.CONFIGURE, self._handler_unconfigured_configure)

        # DISCONNECTED state event handlers:
        self._fsm.add_handler(PlatformDriverState.DISCONNECTED, PlatformDriverEvent.CONNECT, self._handler_disconnected_connect)
        self._fsm.add_handler(PlatformDriverState.DISCONNECTED, PlatformDriverEvent.DISCONNECT, self._handler_disconnected_disconnect)

        # CONNECTED state event handlers:
        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.DISCONNECT, self._handler_connected_disconnect)
        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.CONNECTION_LOST, self._handler_connected_connection_lost)

        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.PING, self._handler_connected_ping)
        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.GET, self._handler_connected_get)
        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.SET, self._handler_connected_set)
        self._fsm.add_handler(PlatformDriverState.CONNECTED, PlatformDriverEvent.EXECUTE, self._handler_connected_execute)
开发者ID:JeffRoy,项目名称:marine-integrations,代码行数:34,代码来源:platform_driver.py


示例9: _publish_for_child

    def _publish_for_child(self, child_obj, status_name, status):
        """
        Publishes a DeviceAggregateStatusEvent from the given platform or
        instrument object.

        NOTE that we just publish on behalf of the child, but the statuses in
        the child itself are *not* set. This is OK for these tests; we just
        need that child's ancestors to react to the event.
        """

        if 'platform_device_id' in child_obj:
            origin = child_obj.platform_device_id
            origin_type = "PlatformDevice"
        else:
            origin = child_obj.instrument_device_id
            origin_type = "InstrumentDevice"

        # create and publish event from the given origin and type:
        evt = dict(event_type='DeviceAggregateStatusEvent',
                   origin_type=origin_type,
                   origin=origin,
                   description="Fake event for testing",
                   status_name=status_name,
                   status=status)

        log.debug("publishing for child %r: evt=%s", origin, evt)
        self._event_publisher.publish_event(**evt)
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:27,代码来源:test_platform_status.py


示例10: recv_packet

    def recv_packet(self, msg, stream_route, stream_id):
        '''
        The consumer callback to parse and manage the granule.
        The message is ACK'd once the function returns
        '''
        log.trace('received granule for stream %s', stream_id)

        if msg == {}:
            log.error('Received empty message from stream: %s', stream_id)
            return
        # Message validation
        if not isinstance(msg, Granule):
            log.error('Ingestion received a message that is not a granule: %s', msg)
            return


        rdt = RecordDictionaryTool.load_from_granule(msg)
        if rdt is None:
            log.error('Invalid granule (no RDT) for stream %s', stream_id)
            return
        if not len(rdt):
            log.debug('Empty granule for stream %s', stream_id)
            return

        self.persist_or_timeout(stream_id, rdt)
开发者ID:lukecampbell,项目名称:coi-services,代码行数:25,代码来源:science_granule_ingestion_worker.py


示例11: consume_event

 def consume_event(self, evt, *args, **kwargs):
     """
     Test callback for events.
     """
     log.debug('Test got event: %s, args: %s, kwargs: %s',
               str(evt), str(args), str(kwargs))
     
     if evt.type_ == 'PublicPlatformTelemetryEvent':
         self._telem_evts.append(evt)
         if self._no_telem_evts > 0 and self._no_telem_evts == len(self._telem_evts):
                 self._done_telem_evt.set()
                 
     elif evt.type_ == 'RemoteQueueModifiedEvent':
         self._queue_mod_evts.append(evt)
         if self._no_queue_mod_evts > 0 and self._no_queue_mod_evts == len(self._queue_mod_evts):
                 self._done_queue_mod_evt.set()
         
     elif evt.type_ == 'RemoteCommandTransmittedEvent':
         self._cmd_tx_evts.append(evt)
         if self._no_cmd_tx_evts > 0 and self._no_cmd_tx_evts == len(self._cmd_tx_evts):
                 self._done_cmd_tx_evt.set()
     
     elif evt.type_ == 'RemoteCommandResult':
         cmd = evt.command
         self._results_recv[cmd.command_id] = cmd
         if len(self._results_recv) == self._no_requests:
             self._done_cmd_evt.set()
开发者ID:kerfoot,项目名称:coi-services,代码行数:27,代码来源:test_2caa.py


示例12: _construct_stream_and_publisher

    def _construct_stream_and_publisher(self, stream_name, stream_config):

        if log.isEnabledFor(logging.TRACE):  # pragma: no cover
            log.trace("%r: _construct_stream_and_publisher: "
                      "stream_name:%r, stream_config:\n%s",
                      self._platform_id, stream_name,
                      self._pp.pformat(stream_config))

        decoder = IonObjectDeserializer(obj_registry=get_obj_registry())

        if 'stream_def_dict' not in stream_config:
            # should not happen: PlatformAgent._validate_configuration validates this.
            log.error("'stream_def_dict' key not in configuration for stream %r" % stream_name)
            return

        stream_def_dict = stream_config['stream_def_dict']
        stream_def_dict['type_'] = 'StreamDefinition'
        stream_def_obj = decoder.deserialize(stream_def_dict)
        self._stream_defs[stream_name] = stream_def_obj

        routing_key           = stream_config['routing_key']
        stream_id             = stream_config['stream_id']
        exchange_point        = stream_config['exchange_point']
        parameter_dictionary  = stream_def_dict['parameter_dictionary']
        log.debug("%r: got parameter_dictionary from stream_def_dict", self._platform_id)

        self._data_streams[stream_name] = stream_id
        self._param_dicts[stream_name] = ParameterDictionary.load(parameter_dictionary)
        stream_route = StreamRoute(exchange_point=exchange_point, routing_key=routing_key)
        publisher = self._create_publisher(stream_id, stream_route)
        self._data_publishers[stream_name] = publisher

        log.debug("%r: created publisher for stream_name=%r", self._platform_id, stream_name)
开发者ID:edwardhunter,项目名称:coi-services,代码行数:33,代码来源:platform_agent_stream_publisher.py


示例13: __init__

    def __init__(self, agent):
        self._agent = agent

        self._platform_id = agent._platform_id
        self.resource_id  = agent.resource_id
        self._pp          = agent._pp
        self.CFG          = agent.CFG

        # Dictionaries used for data publishing.
        self._data_streams = {}
        self._param_dicts = {}
        self._stream_defs = {}
        self._data_publishers = {}

        self._connection_ID = None
        self._connection_index = {}

        # Set of parameter names received in event notification but not
        # configured. Allows to log corresponding warning only once.
        self._unconfigured_params = set()

        stream_info = self.CFG.get('stream_config', None)
        if stream_info is None:
            # should not happen: PlatformAgent._validate_configuration validates this.
            log.error("%r: No stream_config given in CFG", self._platform_id)
            return

        for stream_name, stream_config in stream_info.iteritems():
            self._construct_stream_and_publisher(stream_name, stream_config)

        log.debug("%r: PlatformAgentStreamPublisher complete", self._platform_id)
开发者ID:edwardhunter,项目名称:coi-services,代码行数:31,代码来源:platform_agent_stream_publisher.py


示例14: handle_attribute_value_event

    def handle_attribute_value_event(self, driver_event):
        if log.isEnabledFor(logging.TRACE):  # pragma: no cover
            # show driver_event as retrieved (driver_event.vals_dict might be large)
            log.trace("%r: driver_event = %s", self._platform_id, driver_event)
            log.trace("%r: vals_dict:\n%s",
                      self._platform_id, self._pp.pformat(driver_event.vals_dict))

        elif log.isEnabledFor(logging.DEBUG):  # pragma: no cover
            log.debug("%r: driver_event = %s", self._platform_id, driver_event.brief())

        stream_name = driver_event.stream_name

        publisher = self._data_publishers.get(stream_name, None)
        if not publisher:
            log.warn('%r: no publisher configured for stream_name=%r. '
                     'Configured streams are: %s',
                     self._platform_id, stream_name, self._data_publishers.keys())
            return

        param_dict = self._param_dicts[stream_name]
        stream_def = self._stream_defs[stream_name]

        if isinstance(stream_def, str):
            rdt = RecordDictionaryTool(param_dictionary=param_dict.dump(),
                                       stream_definition_id=stream_def)
        else:
            rdt = RecordDictionaryTool(stream_definition=stream_def)

        self._publish_granule_with_multiple_params(publisher, driver_event,
                                                   param_dict, rdt)
开发者ID:edwardhunter,项目名称:coi-services,代码行数:30,代码来源:platform_agent_stream_publisher.py


示例15: _check_computed_attributes_of_extended_product

    def _check_computed_attributes_of_extended_product(self, expected_data_product_id = '', extended_data_product = None):

        self.assertEqual(expected_data_product_id, extended_data_product._id)
        log.debug("extended_data_product.computed: %s", extended_data_product.computed)

        # Verify that computed attributes exist for the extended instrument
        self.assertIsInstance(extended_data_product.computed.product_download_size_estimated, ComputedFloatValue)
        self.assertIsInstance(extended_data_product.computed.number_active_subscriptions, ComputedIntValue)
        self.assertIsInstance(extended_data_product.computed.data_url, ComputedStringValue)
        self.assertIsInstance(extended_data_product.computed.stored_data_size, ComputedIntValue)
        self.assertIsInstance(extended_data_product.computed.recent_granules, ComputedDictValue)
        self.assertIsInstance(extended_data_product.computed.parameters, ComputedListValue)
        self.assertIsInstance(extended_data_product.computed.recent_events, ComputedEventListValue)

        self.assertIsInstance(extended_data_product.computed.provenance, ComputedDictValue)
        self.assertIsInstance(extended_data_product.computed.user_notification_requests, ComputedListValue)
        self.assertIsInstance(extended_data_product.computed.active_user_subscriptions, ComputedListValue)
        self.assertIsInstance(extended_data_product.computed.past_user_subscriptions, ComputedListValue)
        self.assertIsInstance(extended_data_product.computed.last_granule, ComputedDictValue)
        self.assertIsInstance(extended_data_product.computed.is_persisted, ComputedIntValue)
        self.assertIsInstance(extended_data_product.computed.data_contents_updated, ComputedStringValue)
        self.assertIsInstance(extended_data_product.computed.data_datetime, ComputedListValue)

        # exact text here keeps changing to fit UI capabilities.  keep assertion general...
        self.assertEqual( 2, len(extended_data_product.computed.data_datetime.value) )

        notifications = extended_data_product.computed.user_notification_requests.value

        notification = notifications[0]
        self.assertEqual(expected_data_product_id, notification.origin)
        self.assertEqual("data product", notification.origin_type)
        self.assertEqual('DetectionEvent', notification.event_type)
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:32,代码来源:test_activate_instrument.py


示例16: _create_interval_timer_with_end_time

    def _create_interval_timer_with_end_time(self,timer_interval= None, end_time = None ):
        '''
        A convenience method to set up an interval timer with an end time
        '''
        self.timer_received_time = 0
        self.timer_interval = timer_interval

        start_time = self.now_utc()
        if not end_time:
            end_time = start_time + 2 * timer_interval + 1

        log.debug("got the end time here!! %s" % end_time)

        # Set up the interval timer. The scheduler will publish event with origin set as "Interval Timer"
        sid = self.ssclient.create_interval_timer(start_time="now" ,
            interval=self.timer_interval,
            end_time=end_time,
            event_origin="Interval Timer",
            event_subtype="")

        def cleanup_timer(scheduler, schedule_id):
            """
            Do a friendly cancel of the scheduled event.
            If it fails, it's ok.
            """
            try:
                scheduler.cancel_timer(schedule_id)
            except:
                log.warn("Couldn't cancel")

        self.addCleanup(cleanup_timer, self.ssclient, sid)

        return sid
开发者ID:kerfoot,项目名称:coi-services,代码行数:33,代码来源:test_transform_prototype.py


示例17: create_resources_snapshot

    def create_resources_snapshot(self, persist=False, filename=None):
        ds = CouchDataStore(DataStore.DS_RESOURCES, profile=DataStore.DS_PROFILE.RESOURCES, config=CFG, scope=self.sysname)
        all_objs = ds.find_docs_by_view("_all_docs", None, id_only=False)

        log.info("Found %s objects in datastore resources", len(all_objs))

        resources = {}
        associations = {}
        snapshot = dict(resources=resources, associations=associations)

        for obj_id, key, obj in all_objs:
            if obj_id.startswith("_design"):
                continue
            if not isinstance(obj, dict):
                raise Inconsistent("Object of bad type found: %s" % type(obj))
            obj_type = obj.get("type_", None)
            if obj_type == "Association":
                associations[obj_id] = obj.get("ts", None)
            elif obj_type:
                resources[obj_id] = obj.get("ts_updated", None)
            else:
                raise Inconsistent("Object with no type_ found: %s" % obj)

        if persist:
            dtstr = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')
            path = filename or "interface/rrsnapshot_%s.json" % dtstr
            snapshot_json = json.dumps(snapshot)
            with open(path, "w") as f:
                #yaml.dump(snapshot, f, default_flow_style=False)
                f.write(snapshot_json)

        log.debug("Created resource registry snapshot. %s resources, %s associations", len(resources), len(associations))

        return snapshot
开发者ID:MauriceManning,项目名称:coi-services,代码行数:34,代码来源:resources.py


示例18: find_events

    def find_events(self, origin='', type='', min_datetime='', max_datetime='', limit=-1,
                    descending=False, skip=0, computed=False):
        """
        Returns a list of events that match the specified search criteria.
        Can return a list of EventComputedAttributes if requested with event objects contained.
        Pagination arguments are supported.

        @param origin         str
        @param min_datetime   str  milliseconds
        @param max_datetime   str  milliseconds
        @param limit          int         (integer limiting the number of results (0 means unlimited))
        @param descending     boolean     (if True, reverse order (of production time) is applied, e.g. most recent first)
        @retval event_list    []
        """
        if limit == 0:
            limit = int(self.CFG.get_safe("service.user_notification.max_events_limit", 1000))
        if max_datetime == "now":
            max_datetime = get_ion_ts()

        event_tuples = self.container.event_repository.find_events(event_type=type, origin=origin,
                                                                   start_ts=min_datetime, end_ts=max_datetime,
                                                                   limit=limit, descending=descending, skip=skip)

        events = [item[2] for item in event_tuples]
        log.debug("find_events found %s events", len(events))

        if computed:
            computed_events = self._get_computed_events(events, include_events=True)
            events = computed_events.computed_list

        return events
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:31,代码来源:user_notification_service.py


示例19: _extract_granule_data

    def _extract_granule_data(self, granules):
        """
        Pull all data out of all granules and return a dict of values
        """
        result = []
        for granule in granules:
            group = []
            log.debug("Granule: %s", granule)
            rdt = RecordDictionaryTool.load_from_granule(granule)

            # Store the data from each record
            for key, value in rdt.iteritems():
                for i in range(0, len(value)):
                    if len(group) <= i:
                        group.append({})
                    group[i][key] = value[i]

                    # Store the connection information for each record
                    if not 'connection_index' in group[i]:
                        group[i]['connection_index'] = granule.connection_index

                    if not 'connection_id' in group[i]:
                        group[i]['connection_id'] = granule.connection_id



            result += group

        log.debug("extracted granules: %s", pprint.pformat(result))

        return result
开发者ID:ednad,项目名称:coi-services,代码行数:31,代码来源:result_set.py


示例20: _get_computed_events

    def _get_computed_events(self, events, add_usernames=True, include_events=False):
        """
        Get events for use in extended resource computed attribute
        @retval ComputedListValue with value list of 4-tuple with Event objects
        """
        events = events or []

        ret = IonObject(OT.ComputedEventListValue)
        ret.value = events
        ret.computed_list = [get_event_computed_attributes(event, include_event=include_events) for event in events]
        ret.status = ComputedValueAvailability.PROVIDED

        if add_usernames:
            try:
                actor_ids = {evt.actor_id for evt in events if evt.actor_id}
                log.debug("Looking up UserInfo for actors: %s" % actor_ids)
                if actor_ids:
                    userinfo_list, assoc_list = self.clients.resource_registry.find_objects_mult(actor_ids,
                                                                                                 predicate=PRED.hasInfo,
                                                                                                 id_only=False)
                    actor_map = {assoc.s: uinfo for uinfo, assoc in zip(userinfo_list, assoc_list)}

                    for evt, evt_cmp in zip(events, ret.computed_list):
                        ui = actor_map.get(evt.actor_id, None)
                        if ui:
                            evt_cmp["event_summary"] += " [%s %s]" % (ui.contact.individual_names_given, ui.contact.individual_name_family)

            except Exception as ex:
                log.exception("Cannot find user names for event actor_ids")

        return ret
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:31,代码来源:user_notification_service.py



注:本文中的pyon.public.log.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python log.error函数代码示例发布时间:2022-05-27
下一篇:
Python public.IonObject类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap