本文整理汇总了Python中weeutil.weeutil.to_bool函数的典型用法代码示例。如果您正苦于以下问题:Python to_bool函数的具体用法?Python to_bool怎么用?Python to_bool使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了to_bool函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run
def run(self):
import weeutil.ftpupload
t1 = time.time()
if self.skin_dict.has_key('HTML_ROOT'):
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['HTML_ROOT'])
else:
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.config_dict['StdReport']['HTML_ROOT'])
try:
ftpData = weeutil.ftpupload.FtpUpload(server = self.skin_dict['server'],
user = self.skin_dict['user'],
password = self.skin_dict['password'],
local_root = local_root,
remote_root = self.skin_dict['path'],
port = int(self.skin_dict.get('port', 21)),
name = self.skin_dict['REPORT_NAME'],
passive = to_bool(self.skin_dict.get('passive', True)),
max_tries = int(self.skin_dict.get('max_tries', 3)),
secure = to_bool(self.skin_dict.get('secure_ftp', False)))
except Exception:
syslog.syslog(syslog.LOG_DEBUG, "reportengine: FTP upload not requested. Skipped.")
return
try:
N = ftpData.run()
except (socket.timeout, socket.gaierror, ftplib.all_errors, IOError), e:
(cl, unused_ob, unused_tr) = sys.exc_info()
syslog.syslog(syslog.LOG_ERR, "reportengine: Caught exception %s in FtpGenerator; %s." % (cl, e))
weeutil.weeutil.log_traceback(" **** ")
return
开发者ID:jonkjon,项目名称:weewx,代码行数:33,代码来源:reportengine.py
示例2: run
def run(self):
import weeutil.rsyncupload
# We don't try to collect performance statistics about rsync, because rsync
# will report them for us. Check the debug log messages.
try:
if self.skin_dict.has_key('HTML_ROOT'):
html_root = self.skin_dict['HTML_ROOT']
else:
html_root = self.config_dict['StdReport']['HTML_ROOT']
rsyncData = weeutil.rsyncupload.RsyncUpload(
local_root = os.path.join(self.config_dict['WEEWX_ROOT'], html_root),
remote_root = self.skin_dict['path'],
server = self.skin_dict['server'],
user = self.skin_dict.get('user'),
port = self.skin_dict.get('port'),
ssh_options = self.skin_dict.get('ssh_options'),
compress = to_bool(self.skin_dict.get('compress', False)),
delete = to_bool(self.skin_dict.get('delete', False)))
except Exception:
syslog.syslog(syslog.LOG_DEBUG, "reportengine: rsync upload not requested. Skipped.")
return
try:
rsyncData.run()
except (IOError), e:
(cl, unused_ob, unused_tr) = sys.exc_info()
syslog.syslog(syslog.LOG_ERR, "reportengine: Caught exception %s in RsyncGenerator; %s." % (cl, e))
开发者ID:kleopatra999,项目名称:weewx,代码行数:27,代码来源:reportengine.py
示例3: genSchemaOf
def genSchemaOf(self, table):
"""Return a summary of the schema of the specified table.
If the table does not exist, an exception of type weedb.OperationalError is raised."""
for row in self.connection.execute("""PRAGMA table_info(%s);""" % table):
if row[2].upper().startswith('CHAR'):
coltype = 'STR'
else:
coltype = str(row[2]).upper()
yield (row[0], str(row[1]), coltype, not to_bool(row[3]), row[4], to_bool(row[5]))
开发者ID:MiddleFork,项目名称:weewx,代码行数:10,代码来源:sqlite.py
示例4: setup
def setup(self):
self.image_dict = self.skin_dict['ImageGenerator']
self.title_dict = self.skin_dict.get('Labels', {}).get('Generic', {})
self.formatter = weewx.units.Formatter.fromSkinDict(self.skin_dict)
self.converter = weewx.units.Converter.fromSkinDict(self.skin_dict)
# determine how much logging is desired
self.log_success = to_bool(self.image_dict.get('log_success', True))
开发者ID:CantConfirmOrDeny,项目名称:weewx,代码行数:7,代码来源:imagegenerator.py
示例5: genSchemaOf
def genSchemaOf(self, table):
"""Return a summary of the schema of the specified table.
If the table does not exist, an exception of type weedb.OperationalError is raised."""
try:
# Get a cursor directly from MySQL:
cursor = self.connection.cursor()
# MySQL throws an exception if you try to show the columns of a
# non-existing table
try:
cursor.execute("""SHOW COLUMNS IN %s;""" % table)
except _mysql_exceptions.ProgrammingError, e:
# Table does not exist. Change the exception type:
raise weedb.OperationalError(e)
irow = 0
while True:
row = cursor.fetchone()
if row is None: break
# Append this column to the list of columns.
colname = str(row[0])
if row[1].upper()=='DOUBLE':
coltype = 'REAL'
elif row[1].upper().startswith('INT'):
coltype = 'INTEGER'
elif row[1].upper().startswith('CHAR'):
coltype = 'STR'
else:
coltype = str(row[1]).upper()
is_primary = True if row[3] == 'PRI' else False
yield (irow, colname, coltype, to_bool(row[2]), row[4], is_primary)
irow += 1
开发者ID:crmorse,项目名称:weewx-waterflow,代码行数:32,代码来源:mysql.py
示例6: __init__
def __init__(self, host='localhost', user='', password='', database_name='',
port=3306, engine=DEFAULT_ENGINE, autocommit=True, **kwargs):
"""Initialize an instance of Connection.
Parameters:
host: IP or hostname with the mysql database (required)
user: User name (required)
password: The password for the username (required)
database_name: The database to be used. (required)
port: Its port number (optional; default is 3306)
engine: The MySQL database engine to use (optional; default is 'INNODB')
autocommit: If True, autocommit is enabled (default is True)
kwargs: Any extra arguments you may wish to pass on to MySQL
connect statement. See the file MySQLdb/connections.py for a list (optional).
"""
connection = MySQLdb.connect(host=host, port=int(port), user=user, passwd=password,
db=database_name, **kwargs)
weedb.Connection.__init__(self, connection, database_name, 'mysql')
# Set the storage engine to be used
set_engine(self.connection, engine)
# Set the transaction isolation level.
self.connection.query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
self.connection.autocommit(to_bool(autocommit))
开发者ID:ioanszilagyi,项目名称:weewx,代码行数:27,代码来源:mysql.py
示例7: genSchemaOf
def genSchemaOf(self, table):
"""Return a summary of the schema of the specified table.
If the table does not exist, an exception of type weedb.OperationalError is raised."""
# Get a cursor directly from MySQL:
cursor = self.connection.cursor()
try:
# If the table does not exist, this will raise a MySQL ProgrammingError exception,
# which gets converted to a weedb.OperationalError exception by the guard decorator
cursor.execute("""SHOW COLUMNS IN %s;""" % table)
irow = 0
while True:
row = cursor.fetchone()
if row is None: break
# Append this column to the list of columns.
colname = str(row[0])
if row[1].upper() == 'DOUBLE':
coltype = 'REAL'
elif row[1].upper().startswith('INT'):
coltype = 'INTEGER'
elif row[1].upper().startswith('CHAR'):
coltype = 'STR'
else:
coltype = str(row[1]).upper()
is_primary = True if row[3] == 'PRI' else False
can_be_null = False if row[2]=='' else to_bool(row[2])
yield (irow, colname, coltype, can_be_null, row[4], is_primary)
irow += 1
finally:
cursor.close()
开发者ID:ioanszilagyi,项目名称:weewx,代码行数:31,代码来源:mysql.py
示例8: run
def run(self):
copy_dict = self.skin_dict['CopyGenerator']
# determine how much logging is desired
log_success = to_bool(copy_dict.get('log_success', True))
copy_list = []
if self.first_run:
# Get the list of files to be copied only once, at the first invocation of
# the generator. Wrap in a try block in case the list does not exist.
try:
copy_list += weeutil.weeutil.option_as_list(copy_dict['copy_once'])
except KeyError:
pass
# Get the list of files to be copied everytime. Again, wrap in a try block.
try:
copy_list += weeutil.weeutil.option_as_list(copy_dict['copy_always'])
except KeyError:
pass
# Change directory to the skin subdirectory:
os.chdir(os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['SKIN_ROOT'],
self.skin_dict['skin']))
# Figure out the destination of the files
html_dest_dir = os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['HTML_ROOT'])
开发者ID:camattin,项目名称:weewx,代码行数:28,代码来源:reportengine.py
示例9: run
def run(self):
"""Main entry point for file generation using Cheetah Templates."""
t1 = time.time()
self.setup()
# Make a copy of the skin dictionary (we will be modifying it):
gen_dict = configobj.ConfigObj(self.skin_dict.dict())
# Look for options in [CheetahGenerator],
section_name = "CheetahGenerator"
# but accept options from [FileGenerator] for backward compatibility.
if "FileGenerator" in gen_dict and "CheetahGenerator" not in gen_dict:
section_name = "FileGenerator"
# The default summary time span is 'None'.
gen_dict[section_name]['summarize_by'] = 'None'
# determine how much logging is desired
log_success = to_bool(gen_dict[section_name].get('log_success', True))
# configure the search list extensions
self.initExtensions(gen_dict[section_name])
# Generate any templates in the given dictionary:
ngen = self.generate(gen_dict[section_name], self.gen_ts)
self.teardown()
elapsed_time = time.time() - t1
if log_success:
loginf("Generated %d files for report %s in %.2f seconds" %
(ngen, self.skin_dict['REPORT_NAME'], elapsed_time))
开发者ID:MLAB-project,项目名称:weewx,代码行数:34,代码来源:cheetahgenerator.py
示例10: __init__
def __init__(self, engine, config_dict):
super(StdArchive, self).__init__(engine, config_dict)
# Extract the various options from the config file. If it's missing, fill in with defaults:
if 'StdArchive' in config_dict:
self.data_binding = config_dict['StdArchive'].get('data_binding', 'wx_binding')
self.record_generation = config_dict['StdArchive'].get('record_generation', 'hardware').lower()
self.archive_delay = to_int(config_dict['StdArchive'].get('archive_delay', 15))
software_interval = to_int(config_dict['StdArchive'].get('archive_interval', 300))
self.loop_hilo = to_bool(config_dict['StdArchive'].get('loop_hilo', True))
else:
self.data_binding = 'wx_binding'
self.record_generation = 'hardware'
self.archive_delay = 15
software_interval = 300
self.loop_hilo = True
syslog.syslog(syslog.LOG_INFO, "engine: Archive will use data binding %s" % self.data_binding)
syslog.syslog(syslog.LOG_INFO, "engine: Record generation will be attempted in '%s'" %
(self.record_generation,))
# If the station supports a hardware archive interval, use that.
# Warn if it is different than what is in config.
ival_msg = ''
try:
if software_interval != self.engine.console.archive_interval:
syslog.syslog(syslog.LOG_ERR,
"engine: The archive interval in the"
" configuration file (%d) does not match the"
" station hardware interval (%d)." %
(software_interval,
self.engine.console.archive_interval))
self.archive_interval = self.engine.console.archive_interval
ival_msg = "(specified by hardware)"
except NotImplementedError:
self.archive_interval = software_interval
ival_msg = "(specified in weewx configuration)"
syslog.syslog(syslog.LOG_INFO, "engine: Using archive interval of %d seconds %s" %
(self.archive_interval, ival_msg))
if self.archive_delay <= 0:
raise weewx.ViolatedPrecondition("Archive delay (%.1f) must be greater than zero." %
(self.archive_delay,))
if self.archive_delay >= self.archive_interval / 2:
syslog.syslog(syslog.LOG_WARNING, "engine: Archive delay (%d) is unusually long" %
(self.archive_delay,))
syslog.syslog(syslog.LOG_DEBUG, "engine: Use LOOP data in hi/low calculations: %d" %
(self.loop_hilo,))
self.setup_database(config_dict)
self.bind(weewx.STARTUP, self.startup)
self.bind(weewx.PRE_LOOP, self.pre_loop)
self.bind(weewx.POST_LOOP, self.post_loop)
self.bind(weewx.CHECK_LOOP, self.check_loop)
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:59,代码来源:engine.py
示例11: __init__
def __init__(self, **stn_dict):
loginf("version is %s" % DRIVER_VERSION)
self.xferfile = stn_dict['xferfile']
self.poll_interval = float(stn_dict.get('poll_interval', 10))
self.dup_interval = float(stn_dict.get('dup_interval', 5))
self.max_tries = int(stn_dict.get('max_tries', 5))
self.retry_wait = int(stn_dict.get('retry_wait', 2))
self.mode = stn_dict.get('mode', 'direct')
self.check_calibration = to_bool(
stn_dict.get('check_calibration', False))
self.set_calibration = to_bool(stn_dict.get('set_calibration', False))
self.last_rain_total = None
self.last_datetime = 0
if self.mode == 'direct':
self._station = ObserverIPStation(**stn_dict)
if self.chkunits(ObserverIPDriver.EXPECTED_UNITS):
logerr("calibration error: %s is expexted to be %f but is %f" %
(i, to_float(calibdata[i]), to_float(stcalib[i])))
raise Exception("Station units not set correctly")
if self._station.version() in ObserverIPDriver.SENSOR_MAP:
self.map = ObserverIPDriver.SENSOR_MAP[self._station.version()]
else:
loginf("Unknown firmware version: %s" %
self._station.version())
self.map = ObserverIPDriver.SENSOR_MAP['default']
else:
self.map = ObserverIPDriver.SENSOR_MAP['wu']
if self.check_calibration:
self._station = ObserverIPStation(**stn_dict)
if self.chkunits(ObserverIPDriver.EXPECTED_UNITS):
raise Exception("Station units not set correctly")
if 'calibration' in stn_dict and self.check_calibration:
if self.chkcalib(stn_dict['calibration']):
if(self.set_calibration):
self._station.setcalibration(stn_dict['calibration'])
if self.chkcalib(stn_dict['calibration']):
raise Exception("Setting calibration unsuccessful")
else:
raise Exception("calibration error")
loginf("polling interval is %s" % self.poll_interval)
开发者ID:matthewwall,项目名称:weewx-observerip,代码行数:44,代码来源:observerip.py
示例12: run
def run(self):
"""This is where the actual work gets done.
Runs through the list of reports. """
if self.gen_ts:
syslog.syslog(syslog.LOG_DEBUG,
"reportengine: Running reports for time %s" %
weeutil.weeutil.timestamp_to_string(self.gen_ts))
else:
syslog.syslog(syslog.LOG_DEBUG, "reportengine: "
"Running reports for latest time in the database.")
# Iterate over each requested report
for report in self.config_dict['StdReport'].sections:
# See if this report is disabled
enabled = to_bool(self.config_dict['StdReport'][report].get('enable', True))
if not enabled:
syslog.syslog(syslog.LOG_DEBUG,
"reportengine: Skipping report %s" % report)
continue
syslog.syslog(syslog.LOG_DEBUG,
"reportengine: Running report %s" % report)
# Figure out where the configuration file is for the skin used for
# this report:
skin_config_path = os.path.join(
self.config_dict['WEEWX_ROOT'],
self.config_dict['StdReport']['SKIN_ROOT'],
self.config_dict['StdReport'][report].get('skin', 'Standard'),
'skin.conf')
# Retrieve the configuration dictionary for the skin. Wrap it in
# a try block in case we fail
try:
skin_dict = configobj.ConfigObj(skin_config_path, file_error=True)
syslog.syslog(
syslog.LOG_DEBUG,
"reportengine: Found configuration file %s for report %s" %
(skin_config_path, report))
except IOError, e:
syslog.syslog(
syslog.LOG_ERR, "reportengine: "
"Cannot read skin configuration file %s for report %s: %s"
% (skin_config_path, report, e))
syslog.syslog(syslog.LOG_ERR, " **** Report ignored")
continue
except SyntaxError, e:
syslog.syslog(
syslog.LOG_ERR, "reportengine: "
"Failed to read skin configuration file %s for report %s: %s"
% (skin_config_path, report, e))
syslog.syslog(syslog.LOG_ERR, " **** Report ignored")
continue
开发者ID:MLAB-project,项目名称:weewx,代码行数:55,代码来源:reportengine.py
示例13: setup
def setup(self):
self.image_dict = self.skin_dict['ImageGenerator']
self.title_dict = self.skin_dict.get('Labels', {}).get('Generic', {})
self.formatter = weewx.units.Formatter.fromSkinDict(self.skin_dict)
self.converter = weewx.units.Converter.fromSkinDict(self.skin_dict)
# determine how much logging is desired
self.log_success = to_bool(self.image_dict.get('log_success', True))
# ensure that we are in a consistent right location
os.chdir(os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['SKIN_ROOT'],
self.skin_dict['skin']))
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:11,代码来源:imagegenerator.py
示例14: run
def run(self):
import weeutil.ftpupload
# determine how much logging is desired
log_success = to_bool(self.skin_dict.get('log_success', True))
t1 = time.time()
if 'HTML_ROOT' in self.skin_dict:
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['HTML_ROOT'])
else:
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.config_dict['StdReport']['HTML_ROOT'])
try:
ftp_data = weeutil.ftpupload.FtpUpload(
server=self.skin_dict['server'],
user=self.skin_dict['user'],
password=self.skin_dict['password'],
local_root=local_root,
remote_root=self.skin_dict['path'],
port=int(self.skin_dict.get('port', 21)),
name=self.skin_dict['REPORT_NAME'],
passive=to_bool(self.skin_dict.get('passive', True)),
max_tries=int(self.skin_dict.get('max_tries', 3)),
secure=to_bool(self.skin_dict.get('secure_ftp', False)),
debug=int(self.skin_dict.get('debug', 0)),
secure_data=to_bool(self.skin_dict.get('secure_data', True)))
except Exception:
syslog.syslog(syslog.LOG_DEBUG,
"ftpgenerator: FTP upload not requested. Skipped.")
return
try:
n = ftp_data.run()
except (socket.timeout, socket.gaierror, ftplib.all_errors, IOError), e:
(cl, unused_ob, unused_tr) = sys.exc_info()
syslog.syslog(syslog.LOG_ERR, "ftpgenerator: "
"Caught exception %s: %s" % (cl, e))
weeutil.weeutil.log_traceback(" **** ")
return
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:41,代码来源:reportengine.py
示例15: run
def run(self):
import user.sftpupload
# determine how much logging is desired
log_success = to_bool(self.skin_dict.get('log_success', True))
t1 = time.time()
if self.skin_dict.has_key('HTML_ROOT'):
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.skin_dict['HTML_ROOT'])
else:
local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
self.config_dict['StdReport']['HTML_ROOT'])
try:
"""Initialize an instance of FtpUpload.
After initializing, call method run() to perform the upload."""
sftpData = SFTPUpload(
#print(config_dict)
#server: The remote server to which the files are to be uploaded.
server = self.skin_dict['server'],
#user, password : The user name and password that are to be used.
user = self.skin_dict['user'],
password = self.skin_dict['password'],
#the local_root of the weewx public_html files.
local_root = local_root,
#the remote path we are looking to upload to.
remote_root = self.skin_dict.get('path', 'public_html'),
#name: A unique name to be given for this FTP session. This allows more
#than one session to be uploading from the same local directory. [Optional.
#Default is 'FTP'.]
name = 'SFTP',
#max_tries: How many times to try creating a directory or uploading
#a file before giving up [Optional. Default is 3]
max_tries = int(self.skin_dict.get('max_tries', 3)),
#debug: Set to 1 for extra debug information, 0 otherwise.
debug = int(self.config_dict.get('debug', 1))
) #End SFTPUploader Initialisation.
except Exception, e:
syslog.syslog(syslog.LOG_DEBUG, "sftp-reportengine: SFTP upload not requested. Skipped.")
print(e)
return
开发者ID:davies-barnard,项目名称:sftpupload,代码行数:52,代码来源:sftpupload.py
示例16: __init__
def __init__(self, **stn_dict):
loginf('driver version is %s' % DRIVER_VERSION)
self.model = stn_dict.get('model', 'AcuRite')
self.max_tries = int(stn_dict.get('max_tries', 10))
self.retry_wait = int(stn_dict.get('retry_wait', 30))
self.polling_interval = int(stn_dict.get('polling_interval', 6))
self.use_constants = to_bool(stn_dict.get('use_constants', False))
self.ignore_bounds = to_bool(stn_dict.get('ignore_bounds', False))
if self.use_constants:
loginf('R2 will be decoded using sensor constants')
if self.ignore_bounds:
loginf('R2 bounds on constants will be ignored')
self.enable_r3 = int(stn_dict.get('enable_r3', 0))
if self.enable_r3:
loginf('R3 data will be attempted')
self.last_rain = None
self.last_r3 = None
self.r3_fail_count = 0
self.r3_max_fail = 3
self.r1_next_read = 0
self.r2_next_read = 0
global DEBUG_RAW
DEBUG_RAW = int(stn_dict.get('debug_raw', 0))
开发者ID:bakerkj,项目名称:weewx,代码行数:23,代码来源:acurite.py
示例17: __init__
def __init__(self, queue, database,
username=None, password=None,
dbadmin_username=None, dbadmin_password=None,
line_format='single-line', create_database=True,
measurement='record', tags=None,
unit_system=None, augment_record=True,
inputs=dict(), obs_to_upload='most', append_units_label=True,
server_url=_DEFAULT_SERVER_URL, skip_upload=False,
manager_dict=None,
post_interval=None, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5):
super(InfluxThread, self).__init__(queue,
protocol_name='Influx',
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
max_tries=max_tries,
timeout=timeout,
retry_wait=retry_wait)
self.database = database
self.username = username
self.password = password
self.measurement = measurement
self.tags = tags
self.obs_to_upload = obs_to_upload
self.append_units_label = append_units_label
self.inputs = inputs
self.server_url = server_url
self.skip_upload = to_bool(skip_upload)
self.unit_system = unit_system
self.augment_record = augment_record
self.templates = dict()
self.line_format = line_format
if create_database:
uname = None
pword = None
if dbadmin_username is not None:
uname = dbadmin_username
pword = dbadmin_password
elif username is not None:
uname = username
pword = password
self.create_database(uname, pword)
开发者ID:matthewwall,项目名称:weewx-influx,代码行数:48,代码来源:influx.py
示例18: __init__
def __init__(
self,
queue,
username,
password,
latitude,
longitude,
altitude,
station_name,
manager_dict,
server_url=_SERVER_URL,
skip_upload=False,
post_interval=None,
max_backlog=0,
stale=None,
log_success=True,
log_failure=True,
timeout=60,
max_tries=3,
retry_wait=5,
):
super(OpenWeatherMapThread, self).__init__(
queue,
protocol_name="OWM",
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
timeout=timeout,
max_tries=max_tries,
retry_wait=retry_wait,
)
self.username = username
self.password = password
self.latitude = float(latitude)
self.longitude = float(longitude)
self.altitude = float(altitude)
self.station_name = station_name
self.server_url = server_url
self.skip_upload = to_bool(skip_upload)
开发者ID:hes19073,项目名称:hesweewx,代码行数:42,代码来源:owm.py
示例19: __init__
def __init__(self, engine, config_dict):
super(AS3935, self).__init__(engine, config_dict)
loginf("service version is %s" % VERSION)
svc_dict = config_dict.get("AS3935", {})
addr = int(svc_dict.get("address", 0x03))
bus = int(svc_dict.get("bus", 1))
indoors = to_bool(svc_dict.get("indoors", True))
noise_floor = int(svc_dict.get("noise_floor", 0))
calib = int(svc_dict.get("calibration", 0x6))
pin = int(svc_dict.get("pin", 17))
self.binding = svc_dict.get("data_binding", None)
self.data = []
# if a binding was specified, then use it to save strikes to database
if self.binding is not None:
# configure the lightning database
dbm_dict = weewx.manager.get_manager_dict(
config_dict["DataBindings"],
config_dict["Databases"],
self.binding,
default_binding_dict=get_default_binding_dict(),
)
with weewx.manager.open_manager(dbm_dict, initialize=True) as dbm:
# ensure schema on disk matches schema in memory
dbcol = dbm.connection.columnsOf(dbm.table_name)
memcol = [x[0] for x in dbm_dict["schema"]]
if dbcol != memcol:
raise Exception("as3935: schema mismatch: %s != %s" % (dbcol, memcol))
# configure the gpio and sensor
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
self.sensor = RPi_AS3935(address=addr, bus=bus)
self.sensor.set_indoors(indoors)
self.sensor.set_noise_floor(noise_floor)
self.sensor.calibrate(tun_cap=calib)
# add a gpio callback for the lightning strikes
GPIO.add_event_detect(pin, GPIO.RISING, callback=self.handle_interrupt)
# on each new archive record, read then clear data since last record
self.bind(weewx.NEW_ARCHIVE_RECORD, self.read_data)
开发者ID:hes19073,项目名称:hesweewx,代码行数:42,代码来源:as3935.py
示例20: __init__
def __init__(self, queue, id, key, manager_dict,
server_url=_SERVER_URL, skip_upload=False,
post_interval=600, max_backlog=sys.maxint, stale=None,
log_success=True, log_failure=True,
timeout=60, max_tries=3, retry_wait=5):
super(WeatherCloudThread, self).__init__(queue,
protocol_name='WeatherCloud',
manager_dict=manager_dict,
post_interval=post_interval,
max_backlog=max_backlog,
stale=stale,
log_success=log_success,
log_failure=log_failure,
max_tries=max_tries,
timeout=timeout,
retry_wait=retry_wait)
self.id = id
self.key = key
self.server_url = server_url
self.skip_upload = to_bool(skip_upload)
开发者ID:hes19073,项目名称:hesweewx,代码行数:20,代码来源:wcloud.py
注:本文中的weeutil.weeutil.to_bool函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论