本文整理汇总了Python中settings.settings_base.SettingsBase类的典型用法代码示例。如果您正苦于以下问题:Python SettingsBase类的具体用法?Python SettingsBase怎么用?Python SettingsBase使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了SettingsBase类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: receive
def receive(self, channel):
"""
Called whenever there is a new sample
Keyword arguments:
channel -- the channel with the new sample
"""
# we always cache
if SettingsBase.get_setting(self, "compact_xml"):
sam = self.__make_compact_xml(channel.name(),channel.get())
else:
sam = self.__make_xml(channel.name(),channel.get())
self.__cache.append(sam)
self.__sample_count += 1
print '%s:%d cache:%s' % (self.__name, self.__sample_count, sam)
# If we have exceeded the sample threshold, notify the thread
# responsible for pushing up data
if self.__sample_count >= SettingsBase.get_setting(self, "sample_threshold"):
# print "idigi_db: Reached threshold of %i, setting event flag" % sample_threshold
self.__threshold_event.set()
return
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:26,代码来源:idigi_upload.py
示例2: apply_settings
def apply_settings(self):
"""\
Called when new configuration settings are available.
Must return tuple of three dictionaries: a dictionary of
accepted settings, a dictionary of rejected settings,
and a dictionary of required settings that were not
found.
"""
try:
if not threading.Thread.isAlive(self):
return
except:
return
if 'update_rate' in accepted and \
accepted['update_rate'] > SHUTDOWN_WAIT:
self.__tracer.warning('Long update_rate setting may ' +
'interfere with shutdown of Dia.')
SettingsBase.merge_settings(self)
accepted, rejected, not_found = SettingsBase.verify_settings(self)
SettingsBase.commit_settings(self, accepted)
return (accepted, rejected, not_found)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:27,代码来源:autotap_ldvds.py
示例3: apply_settings
def apply_settings(self):
SettingsBase.merge_settings(self)
accepted, rejected, not_found = SettingsBase.verify_settings(self)
if len(rejected) or len(not_found):
# there were problems with settings, terminate early:
self.__tracer.error("Settings rejected/not found: %s %s",
rejected, not_found)
return (accepted, rejected, not_found)
# Verify that the sample predelay time when added to the awake time
# is not over 0xffff.
if accepted['sample_predelay'] + accepted['awake_time_ms'] > 0xffff:
self.__tracer.error("The awake_time_ms value (%d) " +
"and sample_predelay value (%d) " +
"when added together cannot exceed 65535.",
self.__name, accepted['sample_predelay'],
accepted['awake_time_ms'])
rejected['awake_time_ms'] = accepted['awake_time_ms']
del accepted['awake_time_ms']
rejected['sample_predelay'] = accepted['sample_predelay']
del accepted['sample_predelay']
return (accepted, rejected, not_found)
SettingsBase.commit_settings(self, accepted)
return (accepted, rejected, not_found)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:30,代码来源:digi_temp.py
示例4: run
def run(self):
while self.started_flag:
msg = self.queue.get()
if not self.started_flag:
return
host = SettingsBase.get_setting(self, 'server_address')
port = SettingsBase.get_setting(self, 'port')
frm = SettingsBase.get_setting(self, 'from_address')
to = SettingsBase.get_setting(self, 'to_address')
try:
s = smtplib.SMTP(host, port)
except Exception, e:
self.__tracer.error("Failed to connect to SMTP server")
self.__tracer.warning("If using a DNS name, " + \
"make sure the Digi device is configured" + \
" to use the correct DNS server")
try:
error_list = s.sendmail(frm, to, msg)
except:
self.__tracer.error("Failed to send messages, please double check server/port")
else:
for err in error_list:
self.__tracer.error("Failed to send message to %s address", err)
s.quit()
开发者ID:nikolaijivkov,项目名称:Cndep_Rest_Service__iDigi_Dia,代码行数:29,代码来源:smtp.py
示例5: run
def run(self):
"""run when our device driver thread is started"""
self.sd = None
'''/
import serial
self.sd = serial.Serial(
0, #port number
baudrate=115200, #baudrate
bytesize=serial.EIGHTBITS, #number of databits
parity=serial.PARITY_NONE, #enable parity checking
stopbits=serial.STOPBITS_ONE, #number of stopbits
timeout=3, #set a timeout value
xonxoff=0, #enable software flow control
rtscts=0, #enable RTS/CTS flow control
)
'''
try:
fd = None
fd = open(path + 'id_secret_token.txt', 'r+')
id = fd.readline()
secret = fd.readline()
token = fd.readline()
fd.close()
self.hvconn = HealthVaultConn(path, id, secret, token)
except:
traceback.print_exc()
if fd != None: fd.close()
self.hvconn = None
ip = SettingsBase.get_setting(self, "server_ip")
port = int(SettingsBase.get_setting(self, "server_port"))
server = 'wsgiref'
run_itty(server, ip, port)
开发者ID:nikolaijivkov,项目名称:MS_HV_Presentation__iDigi_Dia,代码行数:34,代码来源:ms_health_vault.py
示例6: run
def run(self):
type = SettingsBase.get_setting(self, "type")
if type == "serial":
from presentations.console.console_serial_server import \
ConsoleSerialServer
server = ConsoleSerialServer(
SettingsBase.get_setting(self, "device"),
SettingsBase.get_setting(self, "baudrate"),
self.__core,
self.__stopevent
)
else:
server = ConsoleTcpServer(('',
SettingsBase.get_setting(self, "port")),
ConsoleTcpRequestHandler, self.__core,
self.__stopevent)
while 1:
if self.__stopevent.isSet():
break
if isinstance(server, ConsoleTcpServer):
r, w, e = select([server.socket], [], [], 1.0)
else:
r = True # Serial ports are always ready
if r:
# Spawns a thread for TCP, blocks for Serial
server.handle_request()
while hasattr(server, "handlers") and len(server.handlers):
# Wait for handlers to exit
time.sleep(1.0)
开发者ID:nikolaijivkov,项目名称:Cndep_Rest_Service__iDigi_Dia,代码行数:32,代码来源:console.py
示例7: run
def run(self):
#Get the device properties
time_sleep = SettingsBase.get_setting(self,"update_rate")
self.input_gpios = SettingsBase.get_setting(self,"input_gpios")
self.output_gpios = SettingsBase.get_setting(self,"output_gpios")
#Call the GPIOs initializer method
self.initialize_gpios()
#Start the refresh thread
while 1:
if self.__stopevent.isSet():
self.__stopevent.clear()
break
try:
while self.setting_gpio:
pass
self.get_GPIOs()
except Exception, e:
self.__tracer.error("Unable to update values: %s", str(e))
#Sleep the time configured in settings (in seconds)
digitime.sleep(time_sleep)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:26,代码来源:ccwi9p9215_gpio.py
示例8: pre_start
def pre_start(self):
"""\
Do initial base class set up required before start, such as
initialize self._xbee_manager and self._extended_address
"""
if self._xbee_manager is None:
# then initialize things
self._tracer.calls("XBeeBase.pre_start()")
# Fetch the XBee Manager name from the Settings Manager:
dm = self._core.get_service("device_driver_manager")
self._xbee_manager = dm.instance_get(
SettingsBase.get_setting(self, "xbee_device_manager"))
# Register ourselves with the XBee Device Manager instance:
self._xbee_manager.xbee_device_register(self)
# Get the extended address of the device:
self._extended_address = SettingsBase.get_setting(self, "extended_address")
# Create a callback specification that calls back this driver when
# our device has left the configuring state and has transitioned
# to the running state:
xbdm_running_event_spec = XBeeDeviceManagerRunningEventSpec()
xbdm_running_event_spec.cb_set(self.running_indication)
self._xbee_manager.xbee_device_event_spec_add(self,
xbdm_running_event_spec)
# else do nothing
return
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:33,代码来源:xbee_base.py
示例9: apply_settings
def apply_settings(self):
SettingsBase.merge_settings(self)
accepted, rejected, not_found = SettingsBase.verify_settings(self)
# verify exclusive issues
if(accepted['poll_clean_minutes'] > 0):
# then we have exclusive issues to settle
if(accepted['sleep']):
# cannot sleep with clean_minutes active
print "XBeeWatchport: 'sleep' cannot be True if poll_clean_minutes is used."
rejected['sleep'] = accepted['sleep']
del accepted['sleep']
# over-ride / force sample_rate to match clean_minutes
accepted['sample_rate_ms'] = \
accepted['poll_clean_minutes'] * 60000
if len(rejected) or len(not_found):
# there were problems with settings, terminate early:
print "Settings rejected/not found: %s %s" % (rejected, not_found)
return (accepted, rejected, not_found)
SettingsBase.commit_settings(self, accepted)
return (accepted, rejected, not_found)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:26,代码来源:xbee_watchport_clean.py
示例10: start
def start(self):
"""Start the device driver. Returns bool."""
RobustBase.start_pre(self)
# copy the Modbus Unit ID (slave id) to our Veris H8036 object
x = SettingsBase.get_setting(self, "unit_id")
self._my_tracer.debug("Setting Modbus Unit-ID:%d ", x)
self._H8036.set_ModbusAddress(x)
# create the Dia channels
x = SettingsBase.get_setting(self, "channels")
self._my_tracer.debug("Setting H8036 Mode:%s ", x)
self._H8036.enable_channels(x)
nam_list = self._H8036.get_channel_name_list()
for nam in nam_list:
self._my_tracer.debug("Adding Channel:%s ", nam)
self.add_property(
ChannelSourceDeviceProperty(name=nam,
type=float, initial=Sample(0, 0.0, 'not init'),
perms_mask=DPROP_PERM_GET, options=DPROP_OPT_AUTOTIMESTAMP))
RobustBase.start_post(self)
return True
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:27,代码来源:robust_H8036_sec.py
示例11: start
def start(self):
"""Start the device driver. Returns bool."""
ECM1240_Xbee.start(self)
if self._import_settings():
self.apply_settings()
logger_id = SettingsBase.get_setting(self, 'sunspec_lid')
if (logger_id is None) or (logger_id.lower == 'none'):
self._ecm._lid = None
self._lid_ns = None
#elif logger_id.lower is in ['mac', 'auto']:
# self._ecm._lid = None
# self._lid_ns = 'mac'
else:
self._ecm._lid = logger_id
self._lid_ns = 'mac'
format = SettingsBase.get_setting(self, 'sunspec_format').lower()
if format in ECM1240_SunSpec.XML_FORM_LIST:
self._ecm._xml_form = format
self.__tracer.debug('%s: Selecting XML format "%s".', self.__showname, format)
else:
self.__tracer.error('%s: Unknown XML format "%s".', self.__showname, format)
self._ecm._single_channel = SettingsBase.get_setting(self, 'single_channel')
if self._ecm._single_channel:
self.__tracer.debug('%s: Forcing Single Channel Mode', self.__showname)
return True
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:31,代码来源:sunspec_ecm.py
示例12: start
def start(self):
"""Start the device driver. Returns bool."""
self.__tracer.info("Starting device")
# force desired fixed parameters into our base/parent devices
RobustXSerial.start_pre(self)
# match our private tracer to Robust_Base's family one
self.__tracer.level = self._tracer.level
## Remove the statistic channels if NOT desired
# why add, then remove? Otherwise they don't exist at
# start-up for subscription by other devices/presentations
if not SettingsBase.get_setting(self, "add_statistics"):
self.__tracer.info("Statistic Channels are disabled.")
self.remove_list_of_properties(
["co2_stats", "hum_stats", "tmp_stats"])
self.__co2_stats = None
self.__hum_stats = None
self.__tmp_stats = None
## Force a default IA Modbus config if none
# 'enabling' really only enables this config check
if SettingsBase.get_setting(self, "enable_modbus"):
try:
if rci_modbus.rci_test_for_ia_table():
self.__tracer.info("Detected existing Modbus IA Config.")
else:
if rci_modbus.rci_create_ia_table_mbdia():
self.__tracer.info("Created new Modbus IA Config.")
else:
self.__tracer.error("Modbus IA Config creation FAILED.")
except:
self.__tracer.debug(traceback.format_exc())
self.__tracer.error('Modbus IA Config creation FAILED!')
self.__three_command = SettingsBase.get_setting(self, "three_commands")
# Create a DDO configuration block for this device:
xbee_ddo_cfg = self.get_ddo_block()
# enable/disable the LED statistics channels
if SettingsBase.get_setting(self, "disable_led"):
# disable by setting DIO-10 (p0) to 5/dig-out high
xbee_ddo_cfg.add_parameter('P0', 5)
else:
# enable by setting DIO-10 (p0) to 1/RSSI/PWM
xbee_ddo_cfg.add_parameter('P0', 1)
# Register configuration blocks with the XBee Device Manager:
self.get_xbee_manager().xbee_device_config_block_add(self, xbee_ddo_cfg)
RobustXSerial.start_post(self)
# force garbage collection in case we deleted big things
gc.collect()
return True
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:60,代码来源:ventostat.py
示例13: start
def start(self):
# If the use_default_httpserver setting is specified,
# the presentation will start own http server on the specified port.
isDefault = SettingsBase.get_setting(self, 'use_default_httpserver')
if not globals().has_key('Callback'):
isDefault = False
if isDefault:
self._cb_handle = Callback(self.cb)
self.__tracer.info("using web page %s and"
" using digiweb", self.get_page())
else:
self._cb_handle = self.get_channels
try:
port = SettingsBase.get_setting(self, 'port')
self.__tracer.info("using port %d and BaseHTTPServer", port)
HTTPServer.__init__(self, ('', port), WebRequestHandler)
except Exception:
self.__tracer.debug(traceback.format_exc())
self.socket.close()
# Only start a thread if the Python web-server is
# used:
threading.Thread.start(self)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:25,代码来源:web.py
示例14: __send_to_idigi
def __send_to_idigi(self, data):
"""
Sends data to iDigi
Keyword arguments:
data - the XML string to send
"""
filename = SettingsBase.get_setting(self, "filename")
filename_format = SettingsBase.get_setting(self, "filename_format")
filename = filename_format % (filename, self.__current_file_number)
collection = SettingsBase.get_setting(self, "collection")
secure = SettingsBase.get_setting(self, "secure")
print "idigi_upload: Uploading %s to iDigi" % filename
success, err, errmsg = idigi_data.send_idigi_data(data, filename, collection, secure)
if not success:
# if successful, delete upload list else try again next time
print "idigi_db: Uploading ERROR %s (%s)" % (err,errmsg)
self.__current_file_number += 1
max_files = SettingsBase.get_setting(self, "file_count")
if self.__current_file_number >= max_files + 1:
self.__current_file_number = 1
return success
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:30,代码来源:idigi_upload.py
示例15: start_post
def start_post(self):
self._tracer.debug("RobustXBee:Start_Post")
hb = SettingsBase.get_setting(self, "heart_beat_sec")
if (hb is not None) and (hb > 0):
# then enable the periodic intake of data productions
self.__xbee_manager.register_sample_listener(self,
self.get_extended_address(), self._heart_beat_indication)
hb = SettingsBase.get_setting(self, "heart_beat_io")
if (hb is not None) and (len(hb) > 0):
# then set a dummy IO to input, but only if HeartBeat active
cfg = self.get_ddo_block()
cfg.add_parameter(hb, 3)
self.get_xbee_manager().xbee_device_config_block_add(
self, cfg)
else: # no heart_beat
try:
# remove the availability channel
self.remove_one_property(self.RXBEE_DEF_AVAIL_CHAN)
# remove the online channel
self.remove_one_property(self.HB_STATUS_CHAN)
except:
pass
# Indicate that we have no more configuration to add:
self.get_xbee_manager().xbee_device_configure(self)
return RobustBase.start_post(self)
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:31,代码来源:robust_xbee.py
示例16: repeat
def repeat(self):
if self.started == False:
time.sleep(60)
interval = SettingsBase.get_setting(self, "interval")
if self.__event_timer is not None:
try:
self.__xbee_manager.xbee_device_schedule_cancel(
self.__event_timer)
except:
pass
self.__event_timer = self.__xbee_manager.xbee_device_schedule_after(
SettingsBase.get_setting(self, "interval"),
self.repeat)
try:
self.__upload_data()
self.connected = 0
except:
self.connected += 1
print "error in sending from repeat function"
开发者ID:Lewiswight,项目名称:MistAway-Gateway,代码行数:31,代码来源:MistAwayUploadSocket.py
示例17: apply_settings
def apply_settings(self):
SettingsBase.merge_settings(self)
accepted, rejected, not_found = SettingsBase.verify_settings(self)
if len(rejected) or len(not_found):
# there were problems with settings, terminate early:
self.__tracer.error("Settings rejected/not found: %s %s",
rejected, not_found)
return (accepted, rejected, not_found)
# Walk each setting, and verify the physical channel type
# is the same type the user specified in their config.
for setting in accepted.copy():
if setting[0:7] == "channel":
try:
channel = setting[7]
operation = setting[9:]
type = digihw.get_channel_type(int(channel) - 1)
if type == self.IO_TYPE_ANALOG:
if operation != "mode":
raise ValueError, "Channel mode is not correct"
elif type == self.IO_TYPE_DIGITAL:
if operation != "dir" and operation != "source":
raise ValueError, "Channel mode is not correct"
else:
raise ValueError, "Channel mode is not correct"
except Exception, e:
self.__tracer.error("Unable to parse settings: %s", e)
rejected[setting] = accepted[setting]
del accepted[setting]
开发者ID:nikolaijivkov,项目名称:Cndep_Rest_Service__iDigi_Dia,代码行数:31,代码来源:local_io.py
示例18: __init__
def __init__(self, settings_flo, settings_filename):
# Provides self.settings and serialization:
settings_list = [
Setting(
name='devices', type=list, required=False, default_value=[]),
Setting(
name='loggers', type=list, required=False, default_value=[]),
Setting(
name='presentations', type=list, required=False,
default_value=[]),
Setting(
name='services', type=list, required=False, default_value=[]),
Setting(
name='tracing', type=list, required=False, default_value=[]),
]
SettingsBase.__init__(self, binding=(), setting_defs=settings_list)
self.__settings_filename = settings_filename
self.__service_map = {}
self.__serializer_ext_map = {}
self.__sleep_req = None # STUB: seconds to wait before power-off
self.__shutdown_event = threading.Event()
# TODO: core may become a thread so we can monitor services and
# attempt to restart them when they fail.
try:
self.epoch(settings_flo)
except KeyboardInterrupt: # pragma: no cover
raise KeyboardInterrupt
except CoreSettingsException:
print "Core: Initial settings invalid, aborting start up..."
sys.exit()
except:
print "Core: Fatal exception caught! Halting execution."
self.request_shutdown()
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:35,代码来源:core_services.py
示例19: start
def start(self):
"""Start the device driver. Returns bool."""
cm = self.__core.get_service("channel_manager")
cp = cm.channel_publisher_get()
cdb = cm.channel_database_get()
left_channel_name = SettingsBase.get_setting(self, LEFT_VOLUME_CHANNEL)
left_channel = cdb.channel_get(left_channel_name)
right_channel_name = SettingsBase.get_setting(self, RIGHT_VOLUME_CHANNEL)
right_channel = cdb.channel_get(right_channel_name)
# Determine initial state:
if left_channel.get().value >= right_channel.get().value:
print "INIT LEFT: volume is %f" % (left_channel.get().value)
self.__state = STATE_PUMP_OUT_LEFT
self.property_set("left_pump_on",
Sample(0, Boolean(True, STYLE_ONOFF)))
else:
print "INIT RIGHT: volume is %f" % (right_channel.get().value)
self.__state = STATE_PUMP_OUT_RIGHT
self.property_set("right_pump_on",
Sample(0, Boolean(True, STYLE_ONOFF)))
# Perform channel subscriptions:
cp.subscribe(left_channel_name,
lambda chan: self.tank_volume_update(chan, LEFT_VOLUME_CHANNEL))
cp.subscribe(right_channel_name,
lambda chan: self.tank_volume_update(chan, RIGHT_VOLUME_CHANNEL))
return True
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:32,代码来源:tank_demo_control.py
示例20: conditional_settings_serializer_load
def conditional_settings_serializer_load(self,
settings_filename="", suffix=""):
"""
Infer which serializer should be loaded and used.
Parameters:
* `settings_filename` - Filename to use to determine
serializer
* `suffix` - Suffix of filename used to determine serializer
Only one of `settings_filename` or `suffix` need be specified
"""
if len(settings_filename):
suffix = os.path.splitext(settings_filename)[1][1:]
elif not len(suffix):
raise CoreSettingsInvalidSerializer
if suffix in [ 'yml', 'yaml' ]:
from settings.settings_serializer_yaml import SettingsSerializerYaml
SettingsBase.register_serializer(self, "yaml",
SettingsSerializerYaml())
self.__serializer_ext_map["yaml"] = "yml"
return 'yaml'
if suffix == 'pyr':
from settings.settings_serializer_py import SettingsSerializerPy
SettingsBase.register_serializer(self, 'pyr',
SettingsSerializerPy())
self.__serializer_ext_map['pyr'] = 'pyr'
return 'pyr'
else:
raise CoreSettingsInvalidSerializer(
"unsupported serializer '%s'" % suffix )
开发者ID:Lewiswight,项目名称:4CT-GW--master,代码行数:34,代码来源:core_services.py
注:本文中的settings.settings_base.SettingsBase类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论