本文整理汇总了Python中pyon.public.log.warn函数的典型用法代码示例。如果您正苦于以下问题:Python warn函数的具体用法?Python warn怎么用?Python warn使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了warn函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: acquire_samples
def acquire_samples(self, max_samples=0):
log.debug('Orb_DataAgentPlugin.acquire_samples')
if os.path.exists(self.data_dir):
files = os.listdir(self.data_dir)
cols = []
rows = []
for f in files:
fpath = self.data_dir + f
with open(fpath) as fh:
try:
pkt = json.load(fh)
if not cols:
cols = [str(c['chan']) for c in pkt['channels']]
row = self._extract_row(pkt, cols)
dims = [len(c) for c in row[:3]]
if all(d==400 for d in dims):
rows.append(row)
else:
log.warning('Inconsistent dimensions %s, %s' % (str(dims), fpath))
fh.close()
os.remove(fpath)
log.info('sample: ' + fpath)
except Exception as ex:
log.warn(ex)
log.warn('Incomplete packet %s' % fpath)
if cols and rows:
coltypes = {}
for c in cols:
coltypes[c] = '400i4'
cols.append('time')
samples = dict(cols=cols, data=rows, coltypes=coltypes)
return samples
开发者ID:scion-network,项目名称:scion,代码行数:33,代码来源:orb_plugin.py
示例2: handle_attribute_value_event
def handle_attribute_value_event(self, driver_event):
if log.isEnabledFor(logging.TRACE): # pragma: no cover
# show driver_event as retrieved (driver_event.vals_dict might be large)
log.trace("%r: driver_event = %s", self._platform_id, driver_event)
log.trace("%r: vals_dict:\n%s",
self._platform_id, self._pp.pformat(driver_event.vals_dict))
elif log.isEnabledFor(logging.DEBUG): # pragma: no cover
log.debug("%r: driver_event = %s", self._platform_id, driver_event.brief())
stream_name = driver_event.stream_name
publisher = self._data_publishers.get(stream_name, None)
if not publisher:
log.warn('%r: no publisher configured for stream_name=%r. '
'Configured streams are: %s',
self._platform_id, stream_name, self._data_publishers.keys())
return
param_dict = self._param_dicts[stream_name]
stream_def = self._stream_defs[stream_name]
if isinstance(stream_def, str):
rdt = RecordDictionaryTool(param_dictionary=param_dict.dump(),
stream_definition_id=stream_def)
else:
rdt = RecordDictionaryTool(stream_definition=stream_def)
self._publish_granule_with_multiple_params(publisher, driver_event,
param_dict, rdt)
开发者ID:edwardhunter,项目名称:coi-services,代码行数:30,代码来源:platform_agent_stream_publisher.py
示例3: _get_granule_header_errors
def _get_granule_header_errors(self, granule, granule_def):
"""
Verify all parameters defined in the header:
- Stream type
- Internal timestamp
"""
errors = []
granule_dict = self._granule_as_dict(granule)
granule_timestamp = granule_dict.get('internal_timestamp')
expected_time = granule_def.get('internal_timestamp')
# Verify the timestamp
if granule_timestamp and not expected_time:
errors.append("granule_timestamp defined in granule, but not expected")
elif not granule_timestamp and expected_time:
errors.append("granule_timestamp expected, but not defined in granule")
# If we have a timestamp AND expect one then compare values
elif (granule_timestamp and
granule_timestamp != self._string_to_ntp_date_time(expected_time)):
errors.append("expected internal_timestamp mismatch, %f != %f (%f)" %
(self._string_to_ntp_date_time(expected_time), granule_timestamp,
self._string_to_ntp_date_time(expected_time)- granule_timestamp))
# verify the stream name
granule_stream = granule_dict.get('stream_name')
if granule_stream:
expected_stream = self._result_set_header['granule_type']
if granule_stream != expected_stream:
errors.append("expected stream name mismatch: %s != %s" %
(expected_stream, granule_stream))
else:
log.warn("No stream defined in granule. Not verifying stream name.")
return errors
开发者ID:ednad,项目名称:coi-services,代码行数:35,代码来源:result_set.py
示例4: get
def get(self, *args, **kwargs):
#TODO: Add documentation
#TODO: Fix raises statements
"""
Called from:
InstrumentAgent._handler_get_params
@raises InstrumentTimeoutException:
@raises InstrumentProtocolException:
@raises NotImplementedException:
@raises InstrumentParameterException:
"""
try:
pnames=args[0]
except IndexError:
log.warn("No argument provided to get, return all parameters")
pnames = [DataHandlerParameter.ALL]
result = None
if DataHandlerParameter.ALL in pnames:
result = self._params
else:
if not isinstance(pnames, (list,tuple)):
raise InstrumentParameterException('Get argument not a list or tuple: {0}'.format(pnames))
result={}
for pn in pnames:
try:
log.debug('Get parameter with key: {0}'.format(pn))
result[pn] = self._params[pn]
except KeyError:
log.debug('\'{0}\' not found in self._params'.format(pn))
raise InstrumentParameterException('{0} is not a valid parameter for this DataHandler.'.format(pn))
return result
开发者ID:ooici-eoi,项目名称:coi-services,代码行数:34,代码来源:base_data_handler.py
示例5: _set_calibration_for_data_product
def _set_calibration_for_data_product(self, dp_obj, dev_cfg):
from ion.util.direct_coverage_utils import DirectCoverageAccess
from coverage_model import SparseConstantType
log.debug("Setting calibration for data product '%s'", dp_obj.name)
dataset_ids, _ = self.rr.find_objects(dp_obj, PRED.hasDataset, id_only=True)
publisher = EventPublisher(OT.InformationContentModifiedEvent)
if not dataset_ids:
data_product_management = DataProductManagementServiceProcessClient(process=self)
log.debug(" Creating dataset for data product %s", dp_obj.name)
data_product_management.create_dataset_for_data_product(dp_obj._id)
dataset_ids, _ = self.rr.find_objects(dp_obj, PRED.hasDataset, id_only=True)
if not dataset_ids:
raise NotFound('No datasets were found for this data product, ensure that it was created')
for dataset_id in dataset_ids:
# Synchronize with ingestion
with DirectCoverageAccess() as dca:
cov = dca.get_editable_coverage(dataset_id)
# Iterate over the calibrations
for cal_name, contents in dev_cfg.iteritems():
if cal_name in cov.list_parameters() and isinstance(cov.get_parameter_context(cal_name).param_type, SparseConstantType):
value = float(contents['value'])
log.info(' Updating Calibrations for %s in %s', cal_name, dataset_id)
cov.set_parameter_values(cal_name, value)
else:
log.warn(" Calibration %s not found in dataset", cal_name)
publisher.publish_event(origin=dataset_id, description="Calibrations Updated")
publisher.close()
log.info("Calibration set for data product '%s' in %s coverages", dp_obj.name, len(dataset_ids))
开发者ID:j2project,项目名称:coi-services,代码行数:29,代码来源:agentctrl.py
示例6: got_event
def got_event(evt, *args, **kwargs):
if not self._active:
log.warn("%r: got_event called but manager has been destroyed",
self._platform_id)
return
if evt.type_ != event_type:
log.trace("%r: ignoring event type %r. Only handle %r directly",
self._platform_id, evt.type_, event_type)
return
if evt.sub_type != sub_type:
log.trace("%r: ignoring event sub_type %r. Only handle %r",
self._platform_id, evt.sub_type, sub_type)
return
state = self._agent.get_agent_state()
statuses = formatted_statuses(self.aparam_aggstatus,
self.aparam_child_agg_status,
self.aparam_rollup_status)
invalidated_children = self._agent._get_invalidated_children()
log.info("%r/%s: (%s) status report triggered by diagnostic event:\n"
"%s\n"
"%40s : %s\n",
self._platform_id, state, self.resource_id, statuses,
"invalidated_children", invalidated_children)
开发者ID:ednad,项目名称:coi-services,代码行数:29,代码来源:status_manager.py
示例7: _retrieve_attribute_value
def _retrieve_attribute_value(self):
"""
Retrieves the attribute value using the given function and calls
_values_retrieved with retrieved values.
"""
attrNames = [self._attr_id]
from_time = (self._last_ts + _DELTA_TIME) if self._last_ts else 0.0
if log.isEnabledFor(logging.DEBUG):
log.debug("CIDEVSA-450 %r: retrieving attribute %r from_time %f",
self._platform_id, self._attr_id, from_time)
retrieved_vals = self._get_attribute_values(attrNames, from_time)
if log.isEnabledFor(logging.DEBUG):
log.debug("CIDEVSA-450 %r: _get_attribute_values returned %s", self._platform_id, retrieved_vals)
if log.isEnabledFor(logging.DEBUG):
log.debug("CIDEVSA-450 %r: retrieved attribute %r values from_time %f = %s",
self._platform_id, self._attr_id, from_time, str(retrieved_vals))
if self._attr_id in retrieved_vals:
values = retrieved_vals[self._attr_id]
if values:
self._values_retrieved(values)
elif log.isEnabledFor(logging.DEBUG):
log.debug("CIDEVSA-450 %r: No values reported for attribute=%r from_time=%f",
self._platform_id, self._attr_id, from_time)
else:
log.warn("CIDEVSA-450 %r: unexpected: response does not include requested attribute %r",
self._platform_id, self._attr_id)
开发者ID:blazetopher,项目名称:coi-services,代码行数:32,代码来源:resource_monitor.py
示例8: set_resource
def set_resource(self, resource_id='', params=None):
"""Set the value of the given resource parameters.
@param resource_id The id of the resource agent.
@param params A dict of resource parameter name-value pairs.
@throws BadRequest if the command was malformed.
@throws NotFound if a parameter is not supported by the resource.
@throws ResourceError if the resource encountered an error while setting
the parameters.
@param resource_id str
@param params dict
@throws BadRequest if the command was malformed.
@throws NotFound if the parameter does not exist.
@throws ResourceError if the resource failed while trying to set the parameter.
"""
res_type = self._get_resource_type(resource_id)
if self._has_agent(res_type):
rac = ResourceAgentClient(resource_id=resource_id)
return rac.set_resource(resource_id=resource_id, params=params)
res_interface = self._get_type_interface(res_type)
for param in params:
setter = get_safe(res_interface, "params.%s.set" % param, None)
if setter:
self._call_setter(setter, params[param], resource_id, res_type)
else:
log.warn("set_resource(): param %s not defined", param)
开发者ID:blazetopher,项目名称:coi-services,代码行数:28,代码来源:resource_management_service.py
示例9: _start_port_agent
def _start_port_agent(self, instrument_agent_instance_obj=None):
"""
Construct and start the port agent, ONLY NEEDED FOR INSTRUMENT AGENTS.
"""
_port_agent_config = instrument_agent_instance_obj.port_agent_config
# It blocks until the port agent starts up or a timeout
_pagent = PortAgentProcess.launch_process(_port_agent_config, test_mode = True)
pid = _pagent.get_pid()
port = _pagent.get_data_port()
cmd_port = _pagent.get_command_port()
log.info("IMS:_start_pagent returned from PortAgentProcess.launch_process pid: %s ", pid)
# Hack to get ready for DEMO. Further though needs to be put int
# how we pass this config info around.
host = 'localhost'
driver_config = instrument_agent_instance_obj.driver_config
comms_config = driver_config.get('comms_config')
if comms_config:
host = comms_config.get('addr')
else:
log.warn("No comms_config specified, using '%s'" % host)
# Configure driver to use port agent port number.
instrument_agent_instance_obj.driver_config['comms_config'] = {
'addr' : host,
'cmd_port' : cmd_port,
'port' : port
}
instrument_agent_instance_obj.driver_config['pagent_pid'] = pid
self.imsclient.update_instrument_agent_instance(instrument_agent_instance_obj)
return self.imsclient.read_instrument_agent_instance(instrument_agent_instance_obj._id)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:34,代码来源:test_rsn_platform_instrument.py
示例10: _set_attributes_and_ports_from_agent_device_map
def _set_attributes_and_ports_from_agent_device_map(self, nnode):
"""
Sets the attributes and ports for the given NNode from
self._agent_device_map.
"""
platform_id = nnode.platform_id
if platform_id not in self._agent_device_map:
log.warn("%r: no entry in agent_device_map for platform_id",
self._platform_id, platform_id)
return
device_obj = self._agent_device_map[platform_id]
log.info("%r: for platform_id=%r device_obj=%s",
self._platform_id, platform_id, device_obj)
attrs = device_obj.platform_monitor_attributes
ports = device_obj.ports
for attr_obj in attrs:
attr = Attr(attr_obj.id, {
'name': attr_obj.name,
'monitorCycleSeconds': attr_obj.monitor_rate,
'units': attr_obj.units,
})
nnode.add_attribute(attr)
for port_obj in ports:
port = Port(port_obj.port_id, port_obj.ip_address)
nnode.add_port(port)
开发者ID:tomoreilly,项目名称:coi-services,代码行数:29,代码来源:oms_platform_driver.py
示例11: execute_resource
def execute_resource(self, resource_id='', command=None):
"""Execute command on the resource represented by agent.
@param resource_id The id of the resource agennt.
@param command An AgentCommand containing the command.
@retval result An AgentCommandResult containing the result.
@throws BadRequest if the command was malformed.
@throws NotFound if the command is not available in current state.
@throws ResourceError if the resource produced an error during execution.
@param resource_id str
@param command AgentCommand
@retval result AgentCommandResult
@throws BadRequest if the command was malformed.
@throws NotFound if the command is not implemented in the agent.
@throws ResourceError if the resource produced an error.
"""
res_type = self._get_resource_type(resource_id)
if self._has_agent(res_type):
rac = ResourceAgentClient(resource_id=resource_id)
return rac.execute_resource(resource_id=resource_id, command=command)
cmd_res = None
res_interface = self._get_type_interface(res_type)
target = get_safe(res_interface, "commands.%s.execute" % command.command, None)
if target:
res = self._call_execute(target, resource_id, res_type, command.args, command.kwargs)
cmd_res = AgentCommandResult(command_id=command.command_id,
command=command.command,
ts_execute=get_ion_ts(),
status=0)
else:
log.warn("execute_resource(): command %s not defined", command.command)
return cmd_res
开发者ID:blazetopher,项目名称:coi-services,代码行数:35,代码来源:resource_management_service.py
示例12: _policy_thread_loop
def _policy_thread_loop(self):
"""Single thread runs policy loops, to prevent races
"""
while True:
# wait until our event is set, up to policy_interval seconds
self.policy_event.wait(self.policy_interval)
if self.policy_event.is_set():
self.policy_event.clear()
log.debug("%sapplying policy due to event", self.logprefix)
else:
# on a regular basis, we check for the current state of each process.
# this is essentially a hedge against bugs in the HAAgent, or in the
# ION events system that could prevent us from seeing state changes
# of processes.
log.debug("%sapplying policy due to timer. Reloading process cache first.",
self.logprefix)
try:
self.control.reload_processes()
except (Exception, gevent.Timeout):
log.warn("%sFailed to reload processes from PD. Will retry later.",
self.logprefix, exc_info=True)
try:
self._apply_policy()
except (Exception, gevent.Timeout):
log.warn("%sFailed to apply policy. Will retry later.",
self.logprefix, exc_info=True)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:28,代码来源:high_availability_agent.py
示例13: _register_service
def _register_service(self):
definition = self.process_definition
existing_services, _ = self.container.resource_registry.find_resources(
restype="Service", name=definition.name)
if len(existing_services) > 0:
if len(existing_services) > 1:
log.warning("There is more than one service object for %s. Using the first one" % definition.name)
service_id = existing_services[0]._id
else:
svc_obj = Service(name=definition.name, exchange_name=definition.name)
service_id, _ = self.container.resource_registry.create(svc_obj)
svcdefs, _ = self.container.resource_registry.find_resources(
restype="ServiceDefinition", name=definition.name)
if svcdefs:
try:
self.container.resource_registry.create_association(
service_id, "hasServiceDefinition", svcdefs[0]._id)
except BadRequest:
log.warn("Failed to associate %s Service and ServiceDefinition. It probably exists.",
definition.name)
else:
log.error("Cannot find ServiceDefinition resource for %s",
definition.name)
return service_id, definition.name
开发者ID:Bobfrat,项目名称:coi-services,代码行数:29,代码来源:high_availability_agent.py
示例14: _get_data_product_metadata
def _get_data_product_metadata(self, data_product_id=""):
dp_meta_data = {}
if not data_product_id:
raise BadRequest("The data_product_id parameter is missing")
# get the dataset_id and dataset associated with the data_product. Need it to do the data retrieval
ds_ids,_ = self.clients.resource_registry.find_objects(data_product_id, PRED.hasDataset, RT.Dataset, True)
if not ds_ids:
log.warn("Specified data_product does not have an associated dataset")
return None
# Start collecting the data to populate the output dictionary
#time_bounds = self.clients.dataset_management.dataset_bounds(ds_ids[0])['time']
#dp_meta_data['time_bounds'] = [float(ntplib.ntp_to_system_time(i)) for i in time_bounds]
time_bounds = self.clients.dataset_management.dataset_temporal_bounds(ds_ids[0])
dp_meta_data['time_bounds'] = time_bounds
dp_meta_data['time_steps'] = self.clients.dataset_management.dataset_extents(ds_ids[0])['time'][0]
# use the associations to get to the parameter dict
parameter_display_names = {}
for stream_def in self.clients.resource_registry.find_objects(data_product_id, PRED.hasStreamDefinition, id_only=True)[0]:
for pdict in self.clients.resource_registry.find_objects(stream_def, PRED.hasParameterDictionary, id_only=True)[0]:
for context in self.clients.resource_registry.find_objects(pdict, PRED.hasParameterContext, id_only=False)[0]:
#display_name = context.display_name
parameter_display_names[context.name] = context.display_name
dp_meta_data['parameter_display_names'] = parameter_display_names
return simplejson.dumps(dp_meta_data)
开发者ID:ateranishi,项目名称:coi-services,代码行数:31,代码来源:visualization_service.py
示例15: _device_removed_event
def _device_removed_event(self, evt):
"""
Handles the device_removed event to remove associated information and
status updates, which mauy result in events being published.
"""
# the actual child removed is in the values component of the event:
if isinstance(evt.values, (list, tuple)):
# normally it will be just one element but handle as array:
for sub_resource_id in evt.values:
self._remove_child(sub_resource_id)
else:
log.warn("%r: Got device_removed event with invalid values member: %r",
self._platform_id, evt)
return
# finally forward event so ancestors also get notified:
# only adjustment is that now my platform's resource_id is the origin:
evt = dict(event_type = evt.type_,
sub_type = evt.sub_type,
origin_type = evt.origin_type,
origin = self.resource_id,
description = evt.description,
values = evt.values)
try:
log.debug('%r: _device_removed_event: forwarding to ancestors: %s',
self._platform_id, evt)
self._event_publisher.publish_event(**evt)
except Exception:
log.exception('%r: platform agent could not publish event: %s',
self._platform_id, evt)
开发者ID:ednad,项目名称:coi-services,代码行数:33,代码来源:status_manager.py
示例16: _get_new_ctd_packet
def _get_new_ctd_packet(self, stream_id, length):
rdt = RecordDictionaryTool(taxonomy=tx)
#Explicitly make these numpy arrays...
c = numpy.array([random.uniform(0.0,75.0) for i in xrange(length)])
t = numpy.array([random.uniform(-1.7, 21.0) for i in xrange(length)])
p = numpy.array([random.lognormvariate(1,2) for i in xrange(length)])
lat = numpy.array([random.uniform(-90.0, 90.0) for i in xrange(length)])
lon = numpy.array([random.uniform(0.0, 360.0) for i in xrange(length)])
h = numpy.array([random.uniform(0.0, 360.0) for i in xrange(length)])
tvar = numpy.array([self.last_time + i for i in xrange(1,length+1)])
self.last_time = max(tvar)
log.warn('Got time: %s' % str(tvar))
log.warn('Got t: %s' % str(t))
rdt['time'] = tvar
rdt['lat'] = lat
rdt['lon'] = lon
rdt['height'] = h
rdt['temp'] = t
rdt['cond'] = c
rdt['pres'] = p
# rdt['coordinates'] = rdt0
# rdt['data'] = rdt1
g = build_granule(data_producer_id=stream_id, taxonomy=tx, record_dictionary=rdt)
return g
开发者ID:pombredanne,项目名称:coi-services,代码行数:31,代码来源:ctd_stream_publisher.py
示例17: _device_terminated_event
def _device_terminated_event(self, origin):
"""
Reacts to the notification that a child agent has been terminated.
- notifies platform to invalidate the child
- set UNKNOWN for the corresponding child_agg_status
- update rollup_status and do publication in case of change
@param origin the origin (resource_id) of the child
"""
# notify platform:
log.debug("%r: notifying agent _child_terminated: origin=%r", self._platform_id, origin)
self._agent._child_terminated(origin)
if origin not in self.aparam_child_agg_status:
log.warn("%r: OOIION-1077 _device_terminated_event: unrecognized origin=%r", self._platform_id, origin)
return
log.debug("%r: OOIION-1077 _device_terminated_event: origin=%r", self._platform_id, origin)
# set entries to UNKNOWN:
self._initialize_child_agg_status(origin)
# update rollup_status and publish in case of change:
for status_name in AggregateStatusType._str_map.keys():
self._update_rollup_status_and_publish(status_name, origin)
开发者ID:ednad,项目名称:coi-services,代码行数:27,代码来源:status_manager.py
示例18: _trigger_func
def _trigger_func(self, stream_id):
point_def = ctd_stream_definition(stream_id=stream_id)
point_constructor = PointSupplementConstructor(point_definition=point_def)
while True:
length = 1
c = [random.uniform(0.0,75.0) for i in xrange(length)]
t = [random.uniform(-1.7, 21.0) for i in xrange(length)]
p = [random.lognormvariate(1,2) for i in xrange(length)]
lat = [random.uniform(-90.0, 90.0) for i in xrange(length)]
lon = [random.uniform(0.0, 360.0) for i in xrange(length)]
tvar = [self.last_time + i for i in xrange(1,length+1)]
self.last_time = max(tvar)
point_id = point_constructor.add_point(time=tvar,location=(lon[0],lat[0]))
point_constructor.add_point_coverage(point_id=point_id, coverage_id='temperature', values=t)
point_constructor.add_point_coverage(point_id=point_id, coverage_id='pressure', values=p)
point_constructor.add_point_coverage(point_id=point_id, coverage_id='conductivity', values=c)
ctd_packet = point_constructor.get_stream_granule()
log.warn('SimpleCtdPublisher sending %d values!' % length)
self.publisher.publish(ctd_packet)
time.sleep(2.0)
开发者ID:daf,项目名称:coi-services,代码行数:34,代码来源:ctd_stream_publisher.py
示例19: process_packet_cb
def process_packet_cb(packet, route, stream):
if not isinstance(packet, DataPacket):
log.warn("Received a non DataPacket message")
self.recv_packets.append(packet)
self.recv_rows += len(packet.data["data"])
log.info("Received data packet #%s: rows=%s, cols=%s", len(self.recv_packets), len(packet.data["data"]),
packet.data["cols"])
开发者ID:scion-network,项目名称:scion,代码行数:7,代码来源:test_scion_agentdata.py
示例20: get_user
def get_user(username, password, *args, **kwargs):
""" Callback to assert username/password and establish a user session.
Used in password authentication flow. Called only during token creation.
NOTE: If the same username logs in multiple times (e.g. different windows),
then the most recent session overrides all others, which still remain valid
"""
log.info("OAuth:get_user(%s)", username)
if username and password:
try:
actor_id = ui_instance.idm_client.check_actor_credentials(username, password)
actor_user = ui_instance.idm_client.read_actor_identity(actor_id)
if actor_user.details.type_ != OT.UserIdentityDetails:
log.warn("Bad identity details")
return None
# Create user session dict for ActorIdentity
full_name = actor_user.details.contact.individual_names_given + " " + actor_user.details.contact.individual_name_family
valid_until = int(get_ion_ts_millis() / 1000 + ui_instance.session_timeout) # Int with seconds
user_session = {"actor_id": actor_id, "user_id": actor_id, "username": username, "full_name": full_name, "valid_until": valid_until}
actor_user.session = user_session
ui_instance.container.resource_registry.update(actor_user)
flask.g.actor_id = actor_id
return user_session
except NotFound:
pass
return None
开发者ID:IvanHreskiv,项目名称:scioncc,代码行数:29,代码来源:server.py
注:本文中的pyon.public.log.warn函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论