• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python weeutil.to_bool函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中weeutil.weeutil.to_bool函数的典型用法代码示例。如果您正苦于以下问题:Python to_bool函数的具体用法?Python to_bool怎么用?Python to_bool使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了to_bool函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: run

    def run(self):
        import weeutil.ftpupload

        t1 = time.time()
        if self.skin_dict.has_key('HTML_ROOT'):
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.skin_dict['HTML_ROOT'])
        else:
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.config_dict['StdReport']['HTML_ROOT'])

        try:
            ftpData = weeutil.ftpupload.FtpUpload(server      = self.skin_dict['server'],
                                                  user        = self.skin_dict['user'],
                                                  password    = self.skin_dict['password'],
                                                  local_root  = local_root,
                                                  remote_root = self.skin_dict['path'],
                                                  port        = int(self.skin_dict.get('port', 21)),
                                                  name        = self.skin_dict['REPORT_NAME'],
                                                  passive     = to_bool(self.skin_dict.get('passive', True)),
                                                  max_tries   = int(self.skin_dict.get('max_tries', 3)),
                                                  secure      = to_bool(self.skin_dict.get('secure_ftp', False)))
        except Exception:
            syslog.syslog(syslog.LOG_DEBUG, "reportengine: FTP upload not requested. Skipped.")
            return

        try:
            N = ftpData.run()
        except (socket.timeout, socket.gaierror, ftplib.all_errors, IOError), e:
            (cl, unused_ob, unused_tr) = sys.exc_info()
            syslog.syslog(syslog.LOG_ERR, "reportengine: Caught exception %s in FtpGenerator; %s." % (cl, e))
            weeutil.weeutil.log_traceback("        ****  ")
            return
开发者ID:jonkjon,项目名称:weewx,代码行数:33,代码来源:reportengine.py


示例2: run

    def run(self):
        import weeutil.rsyncupload
        # We don't try to collect performance statistics about rsync, because rsync
        # will report them for us.  Check the debug log messages.
        try:
            if self.skin_dict.has_key('HTML_ROOT'):
                html_root = self.skin_dict['HTML_ROOT']
            else:
                html_root = self.config_dict['StdReport']['HTML_ROOT']
            rsyncData = weeutil.rsyncupload.RsyncUpload(
                local_root  = os.path.join(self.config_dict['WEEWX_ROOT'], html_root),
                remote_root = self.skin_dict['path'],
                server      = self.skin_dict['server'],
                user        = self.skin_dict.get('user'),
                port        = self.skin_dict.get('port'),
                ssh_options = self.skin_dict.get('ssh_options'),
                compress    = to_bool(self.skin_dict.get('compress', False)),
                delete      = to_bool(self.skin_dict.get('delete', False)))
        except Exception:
            syslog.syslog(syslog.LOG_DEBUG, "reportengine: rsync upload not requested. Skipped.")
            return

        try:
            rsyncData.run()
        except (IOError), e:
            (cl, unused_ob, unused_tr) = sys.exc_info()
            syslog.syslog(syslog.LOG_ERR, "reportengine: Caught exception %s in RsyncGenerator; %s." % (cl, e))
开发者ID:kleopatra999,项目名称:weewx,代码行数:27,代码来源:reportengine.py


示例3: genSchemaOf

 def genSchemaOf(self, table):
     """Return a summary of the schema of the specified table.
     
     If the table does not exist, an exception of type weedb.OperationalError is raised."""
     for row in self.connection.execute("""PRAGMA table_info(%s);""" % table):
         if row[2].upper().startswith('CHAR'):
             coltype = 'STR'
         else:
             coltype = str(row[2]).upper()
         yield (row[0], str(row[1]), coltype, not to_bool(row[3]), row[4], to_bool(row[5]))
开发者ID:MiddleFork,项目名称:weewx,代码行数:10,代码来源:sqlite.py


示例4: setup

 def setup(self):
     self.image_dict = self.skin_dict['ImageGenerator']
     self.title_dict = self.skin_dict.get('Labels', {}).get('Generic', {})
     self.formatter  = weewx.units.Formatter.fromSkinDict(self.skin_dict)
     self.converter  = weewx.units.Converter.fromSkinDict(self.skin_dict)
     # determine how much logging is desired
     self.log_success = to_bool(self.image_dict.get('log_success', True))
开发者ID:CantConfirmOrDeny,项目名称:weewx,代码行数:7,代码来源:imagegenerator.py


示例5: genSchemaOf

 def genSchemaOf(self, table):
     """Return a summary of the schema of the specified table.
     
     If the table does not exist, an exception of type weedb.OperationalError is raised."""
     
     try:
         # Get a cursor directly from MySQL:
         cursor = self.connection.cursor()
         # MySQL throws an exception if you try to show the columns of a
         # non-existing table
         try:
             cursor.execute("""SHOW COLUMNS IN %s;""" % table)
         except _mysql_exceptions.ProgrammingError, e:
             # Table does not exist. Change the exception type:
             raise weedb.OperationalError(e)
         irow = 0
         while True:
             row = cursor.fetchone()
             if row is None: break
             # Append this column to the list of columns.
             colname = str(row[0])
             if row[1].upper()=='DOUBLE':
                 coltype = 'REAL'
             elif row[1].upper().startswith('INT'):
                 coltype = 'INTEGER'
             elif row[1].upper().startswith('CHAR'):
                 coltype = 'STR'
             else:
                 coltype = str(row[1]).upper()
             is_primary = True if row[3] == 'PRI' else False
             yield (irow, colname, coltype, to_bool(row[2]), row[4], is_primary)
             irow += 1
开发者ID:crmorse,项目名称:weewx-waterflow,代码行数:32,代码来源:mysql.py


示例6: __init__

    def __init__(self, host='localhost', user='', password='', database_name='',
                 port=3306, engine=DEFAULT_ENGINE, autocommit=True, **kwargs):
        """Initialize an instance of Connection.

        Parameters:
        
            host: IP or hostname with the mysql database (required)
            user: User name (required)
            password: The password for the username (required)
            database_name: The database to be used. (required)
            port: Its port number (optional; default is 3306)
            engine: The MySQL database engine to use (optional; default is 'INNODB')
            autocommit: If True, autocommit is enabled (default is True)
            kwargs:   Any extra arguments you may wish to pass on to MySQL 
              connect statement. See the file MySQLdb/connections.py for a list (optional).
        """
        connection = MySQLdb.connect(host=host, port=int(port), user=user, passwd=password,
                                     db=database_name, **kwargs)

        weedb.Connection.__init__(self, connection, database_name, 'mysql')

        # Set the storage engine to be used
        set_engine(self.connection, engine)

        # Set the transaction isolation level.
        self.connection.query("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
        self.connection.autocommit(to_bool(autocommit))
开发者ID:ioanszilagyi,项目名称:weewx,代码行数:27,代码来源:mysql.py


示例7: genSchemaOf

    def genSchemaOf(self, table):
        """Return a summary of the schema of the specified table.
        
        If the table does not exist, an exception of type weedb.OperationalError is raised."""

        # Get a cursor directly from MySQL:
        cursor = self.connection.cursor()
        try:
            # If the table does not exist, this will raise a MySQL ProgrammingError exception,
            # which gets converted to a weedb.OperationalError exception by the guard decorator
            cursor.execute("""SHOW COLUMNS IN %s;""" % table)
            irow = 0
            while True:
                row = cursor.fetchone()
                if row is None: break
                # Append this column to the list of columns.
                colname = str(row[0])
                if row[1].upper() == 'DOUBLE':
                    coltype = 'REAL'
                elif row[1].upper().startswith('INT'):
                    coltype = 'INTEGER'
                elif row[1].upper().startswith('CHAR'):
                    coltype = 'STR'
                else:
                    coltype = str(row[1]).upper()
                is_primary = True if row[3] == 'PRI' else False
                can_be_null = False if row[2]=='' else to_bool(row[2])
                yield (irow, colname, coltype, can_be_null, row[4], is_primary)
                irow += 1
        finally:
            cursor.close()
开发者ID:ioanszilagyi,项目名称:weewx,代码行数:31,代码来源:mysql.py


示例8: run

    def run(self):
        copy_dict = self.skin_dict['CopyGenerator']
        # determine how much logging is desired
        log_success = to_bool(copy_dict.get('log_success', True))
        
        copy_list = []

        if self.first_run:
            # Get the list of files to be copied only once, at the first invocation of
            # the generator. Wrap in a try block in case the list does not exist.
            try:
                copy_list += weeutil.weeutil.option_as_list(copy_dict['copy_once'])
            except KeyError:
                pass

        # Get the list of files to be copied everytime. Again, wrap in a try block.
        try:
            copy_list += weeutil.weeutil.option_as_list(copy_dict['copy_always'])
        except KeyError:
            pass

        # Change directory to the skin subdirectory:
        os.chdir(os.path.join(self.config_dict['WEEWX_ROOT'],
                              self.skin_dict['SKIN_ROOT'],
                              self.skin_dict['skin']))
        # Figure out the destination of the files
        html_dest_dir = os.path.join(self.config_dict['WEEWX_ROOT'],
                                     self.skin_dict['HTML_ROOT'])
开发者ID:camattin,项目名称:weewx,代码行数:28,代码来源:reportengine.py


示例9: run

    def run(self):
        """Main entry point for file generation using Cheetah Templates."""

        t1 = time.time()

        self.setup()
        
        # Make a copy of the skin dictionary (we will be modifying it):
        gen_dict = configobj.ConfigObj(self.skin_dict.dict())
        
        # Look for options in [CheetahGenerator],
        section_name = "CheetahGenerator"
        # but accept options from [FileGenerator] for backward compatibility.
        if "FileGenerator" in gen_dict and "CheetahGenerator" not in gen_dict:
            section_name = "FileGenerator"
        
        # The default summary time span is 'None'.
        gen_dict[section_name]['summarize_by'] = 'None'

        # determine how much logging is desired
        log_success = to_bool(gen_dict[section_name].get('log_success', True))

        # configure the search list extensions
        self.initExtensions(gen_dict[section_name])

        # Generate any templates in the given dictionary:
        ngen = self.generate(gen_dict[section_name], self.gen_ts)

        self.teardown()

        elapsed_time = time.time() - t1
        if log_success:
            loginf("Generated %d files for report %s in %.2f seconds" %
                   (ngen, self.skin_dict['REPORT_NAME'], elapsed_time))
开发者ID:MLAB-project,项目名称:weewx,代码行数:34,代码来源:cheetahgenerator.py


示例10: __init__

    def __init__(self, engine, config_dict):
        super(StdArchive, self).__init__(engine, config_dict)

        # Extract the various options from the config file. If it's missing, fill in with defaults:
        if 'StdArchive' in config_dict:
            self.data_binding = config_dict['StdArchive'].get('data_binding', 'wx_binding')
            self.record_generation = config_dict['StdArchive'].get('record_generation', 'hardware').lower()
            self.archive_delay = to_int(config_dict['StdArchive'].get('archive_delay', 15))
            software_interval = to_int(config_dict['StdArchive'].get('archive_interval', 300))
            self.loop_hilo = to_bool(config_dict['StdArchive'].get('loop_hilo', True))
        else:
            self.data_binding = 'wx_binding'
            self.record_generation = 'hardware'
            self.archive_delay = 15
            software_interval = 300
            self.loop_hilo = True
            
        syslog.syslog(syslog.LOG_INFO, "engine: Archive will use data binding %s" % self.data_binding)
        
        syslog.syslog(syslog.LOG_INFO, "engine: Record generation will be attempted in '%s'" % 
                      (self.record_generation,))

        # If the station supports a hardware archive interval, use that.
        # Warn if it is different than what is in config.
        ival_msg = ''
        try:
            if software_interval != self.engine.console.archive_interval:
                syslog.syslog(syslog.LOG_ERR,
                              "engine: The archive interval in the"
                              " configuration file (%d) does not match the"
                              " station hardware interval (%d)." %
                              (software_interval,
                               self.engine.console.archive_interval))
            self.archive_interval = self.engine.console.archive_interval
            ival_msg = "(specified by hardware)"
        except NotImplementedError:
            self.archive_interval = software_interval
            ival_msg = "(specified in weewx configuration)"
        syslog.syslog(syslog.LOG_INFO, "engine: Using archive interval of %d seconds %s" %
                      (self.archive_interval, ival_msg))

        if self.archive_delay <= 0:
            raise weewx.ViolatedPrecondition("Archive delay (%.1f) must be greater than zero." % 
                                             (self.archive_delay,))
        if self.archive_delay >= self.archive_interval / 2:
            syslog.syslog(syslog.LOG_WARNING, "engine: Archive delay (%d) is unusually long" % 
                          (self.archive_delay,))

        syslog.syslog(syslog.LOG_DEBUG, "engine: Use LOOP data in hi/low calculations: %d" % 
                      (self.loop_hilo,))
        
        self.setup_database(config_dict)
        
        self.bind(weewx.STARTUP, self.startup)
        self.bind(weewx.PRE_LOOP, self.pre_loop)
        self.bind(weewx.POST_LOOP, self.post_loop)
        self.bind(weewx.CHECK_LOOP, self.check_loop)
        self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
        self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:59,代码来源:engine.py


示例11: __init__

    def __init__(self, **stn_dict):
        loginf("version is %s" % DRIVER_VERSION)

        self.xferfile = stn_dict['xferfile']
        self.poll_interval = float(stn_dict.get('poll_interval', 10))
        self.dup_interval = float(stn_dict.get('dup_interval', 5))
        self.max_tries = int(stn_dict.get('max_tries', 5))
        self.retry_wait = int(stn_dict.get('retry_wait', 2))
        self.mode = stn_dict.get('mode', 'direct')
        self.check_calibration = to_bool(
            stn_dict.get('check_calibration', False))
        self.set_calibration = to_bool(stn_dict.get('set_calibration', False))
        self.last_rain_total = None
        self.last_datetime = 0

        if self.mode == 'direct':
            self._station = ObserverIPStation(**stn_dict)
            if self.chkunits(ObserverIPDriver.EXPECTED_UNITS):
                logerr("calibration error: %s is expexted to be %f but is %f" %
                       (i, to_float(calibdata[i]), to_float(stcalib[i])))
                raise Exception("Station units not set correctly")
            if self._station.version() in ObserverIPDriver.SENSOR_MAP:
                self.map = ObserverIPDriver.SENSOR_MAP[self._station.version()]
            else:
                loginf("Unknown firmware version: %s" %
                       self._station.version())
                self.map = ObserverIPDriver.SENSOR_MAP['default']
        else:
            self.map = ObserverIPDriver.SENSOR_MAP['wu']
            if self.check_calibration:
                self._station = ObserverIPStation(**stn_dict)
                if self.chkunits(ObserverIPDriver.EXPECTED_UNITS):
                    raise Exception("Station units not set correctly")

        if 'calibration' in stn_dict and self.check_calibration:
            if self.chkcalib(stn_dict['calibration']):
                if(self.set_calibration):
                    self._station.setcalibration(stn_dict['calibration'])
                    if self.chkcalib(stn_dict['calibration']):
                        raise Exception("Setting calibration unsuccessful")
                else:
                    raise Exception("calibration error")
                
        loginf("polling interval is %s" % self.poll_interval)
开发者ID:matthewwall,项目名称:weewx-observerip,代码行数:44,代码来源:observerip.py


示例12: run

    def run(self):
        """This is where the actual work gets done.

        Runs through the list of reports. """

        if self.gen_ts:
            syslog.syslog(syslog.LOG_DEBUG,
                          "reportengine: Running reports for time %s" %
                          weeutil.weeutil.timestamp_to_string(self.gen_ts))
        else:
            syslog.syslog(syslog.LOG_DEBUG, "reportengine: "
                          "Running reports for latest time in the database.")

        # Iterate over each requested report
        for report in self.config_dict['StdReport'].sections:
            # See if this report is disabled
            enabled = to_bool(self.config_dict['StdReport'][report].get('enable', True))
            if not enabled:
                syslog.syslog(syslog.LOG_DEBUG,
                              "reportengine: Skipping report %s" % report)
                continue

            syslog.syslog(syslog.LOG_DEBUG,
                          "reportengine: Running report %s" % report)

            # Figure out where the configuration file is for the skin used for
            # this report:
            skin_config_path = os.path.join(
                self.config_dict['WEEWX_ROOT'],
                self.config_dict['StdReport']['SKIN_ROOT'],
                self.config_dict['StdReport'][report].get('skin', 'Standard'),
                'skin.conf')

            # Retrieve the configuration dictionary for the skin. Wrap it in
            # a try block in case we fail
            try:
                skin_dict = configobj.ConfigObj(skin_config_path, file_error=True)
                syslog.syslog(
                    syslog.LOG_DEBUG,
                    "reportengine: Found configuration file %s for report %s" %
                    (skin_config_path, report))
            except IOError, e:
                syslog.syslog(
                    syslog.LOG_ERR, "reportengine: "
                    "Cannot read skin configuration file %s for report %s: %s"
                    % (skin_config_path, report, e))
                syslog.syslog(syslog.LOG_ERR, "        ****  Report ignored")
                continue
            except SyntaxError, e:
                syslog.syslog(
                    syslog.LOG_ERR, "reportengine: "
                    "Failed to read skin configuration file %s for report %s: %s"
                    % (skin_config_path, report, e))
                syslog.syslog(syslog.LOG_ERR, "        ****  Report ignored")
                continue
开发者ID:MLAB-project,项目名称:weewx,代码行数:55,代码来源:reportengine.py


示例13: setup

 def setup(self):
     self.image_dict = self.skin_dict['ImageGenerator']
     self.title_dict = self.skin_dict.get('Labels', {}).get('Generic', {})
     self.formatter  = weewx.units.Formatter.fromSkinDict(self.skin_dict)
     self.converter  = weewx.units.Converter.fromSkinDict(self.skin_dict)
     # determine how much logging is desired
     self.log_success = to_bool(self.image_dict.get('log_success', True))
     # ensure that we are in a consistent right location
     os.chdir(os.path.join(self.config_dict['WEEWX_ROOT'],
                           self.skin_dict['SKIN_ROOT'],
                           self.skin_dict['skin']))
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:11,代码来源:imagegenerator.py


示例14: run

    def run(self):
        import weeutil.ftpupload

        # determine how much logging is desired
        log_success = to_bool(self.skin_dict.get('log_success', True))

        t1 = time.time()
        if 'HTML_ROOT' in self.skin_dict:
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.skin_dict['HTML_ROOT'])
        else:
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.config_dict['StdReport']['HTML_ROOT'])

        try:
            ftp_data = weeutil.ftpupload.FtpUpload(
                server=self.skin_dict['server'],
                user=self.skin_dict['user'],
                password=self.skin_dict['password'],
                local_root=local_root,
                remote_root=self.skin_dict['path'],
                port=int(self.skin_dict.get('port', 21)),
                name=self.skin_dict['REPORT_NAME'],
                passive=to_bool(self.skin_dict.get('passive', True)),
                max_tries=int(self.skin_dict.get('max_tries', 3)),
                secure=to_bool(self.skin_dict.get('secure_ftp', False)),
                debug=int(self.skin_dict.get('debug', 0)),
                secure_data=to_bool(self.skin_dict.get('secure_data', True)))
        except Exception:
            syslog.syslog(syslog.LOG_DEBUG,
                          "ftpgenerator: FTP upload not requested. Skipped.")
            return

        try:
            n = ftp_data.run()
        except (socket.timeout, socket.gaierror, ftplib.all_errors, IOError), e:
            (cl, unused_ob, unused_tr) = sys.exc_info()
            syslog.syslog(syslog.LOG_ERR, "ftpgenerator: "
                          "Caught exception %s: %s" % (cl, e))
            weeutil.weeutil.log_traceback("        ****  ")
            return
开发者ID:tomdotorg,项目名称:docker-weewx,代码行数:41,代码来源:reportengine.py


示例15: run

    def run(self):
        import user.sftpupload

        # determine how much logging is desired
        log_success = to_bool(self.skin_dict.get('log_success', True))

        t1 = time.time()

        if self.skin_dict.has_key('HTML_ROOT'):
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.skin_dict['HTML_ROOT'])
        else:
            local_root = os.path.join(self.config_dict['WEEWX_ROOT'],
                                      self.config_dict['StdReport']['HTML_ROOT'])
        try:

            """Initialize an instance of FtpUpload.
            After initializing, call method run() to perform the upload."""
            sftpData = SFTPUpload(
  
              #print(config_dict)
              #server: The remote server to which the files are to be uploaded.
              server = self.skin_dict['server'],
    
              #user, password : The user name and password that are to be used.
              user  = self.skin_dict['user'],
              password = self.skin_dict['password'],

              #the local_root of the weewx public_html files.
              local_root   = local_root,

              #the remote path we are looking to upload to.
              remote_root  = self.skin_dict.get('path', 'public_html'),

              #name: A unique name to be given for this FTP session. This allows more
              #than one session to be uploading from the same local directory. [Optional.
              #Default is 'FTP'.]
              name = 'SFTP',

              #max_tries: How many times to try creating a directory or uploading
              #a file before giving up [Optional. Default is 3]
              max_tries   = int(self.skin_dict.get('max_tries', 3)),

              #debug: Set to 1 for extra debug information, 0 otherwise.
              debug = int(self.config_dict.get('debug', 1))

            ) #End SFTPUploader Initialisation.

        except Exception, e:
            syslog.syslog(syslog.LOG_DEBUG, "sftp-reportengine: SFTP upload not requested. Skipped.")
            print(e)
            return
开发者ID:davies-barnard,项目名称:sftpupload,代码行数:52,代码来源:sftpupload.py


示例16: __init__

 def __init__(self, **stn_dict):
     loginf('driver version is %s' % DRIVER_VERSION)
     self.model = stn_dict.get('model', 'AcuRite')
     self.max_tries = int(stn_dict.get('max_tries', 10))
     self.retry_wait = int(stn_dict.get('retry_wait', 30))
     self.polling_interval = int(stn_dict.get('polling_interval', 6))
     self.use_constants = to_bool(stn_dict.get('use_constants', False))
     self.ignore_bounds = to_bool(stn_dict.get('ignore_bounds', False))
     if self.use_constants:
         loginf('R2 will be decoded using sensor constants')
         if self.ignore_bounds:
             loginf('R2 bounds on constants will be ignored')
     self.enable_r3 = int(stn_dict.get('enable_r3', 0))
     if self.enable_r3:
         loginf('R3 data will be attempted')
     self.last_rain = None
     self.last_r3 = None
     self.r3_fail_count = 0
     self.r3_max_fail = 3
     self.r1_next_read = 0
     self.r2_next_read = 0
     global DEBUG_RAW
     DEBUG_RAW = int(stn_dict.get('debug_raw', 0))
开发者ID:bakerkj,项目名称:weewx,代码行数:23,代码来源:acurite.py


示例17: __init__

    def __init__(self, queue, database,
                 username=None, password=None,
                 dbadmin_username=None, dbadmin_password=None,
                 line_format='single-line', create_database=True,
                 measurement='record', tags=None,
                 unit_system=None, augment_record=True,
                 inputs=dict(), obs_to_upload='most', append_units_label=True,
                 server_url=_DEFAULT_SERVER_URL, skip_upload=False,
                 manager_dict=None,
                 post_interval=None, max_backlog=sys.maxint, stale=None,
                 log_success=True, log_failure=True,
                 timeout=60, max_tries=3, retry_wait=5):
        super(InfluxThread, self).__init__(queue,
                                           protocol_name='Influx',
                                           manager_dict=manager_dict,
                                           post_interval=post_interval,
                                           max_backlog=max_backlog,
                                           stale=stale,
                                           log_success=log_success,
                                           log_failure=log_failure,
                                           max_tries=max_tries,
                                           timeout=timeout,
                                           retry_wait=retry_wait)
        self.database = database
        self.username = username
        self.password = password
        self.measurement = measurement
        self.tags = tags
        self.obs_to_upload = obs_to_upload
        self.append_units_label = append_units_label
        self.inputs = inputs
        self.server_url = server_url
        self.skip_upload = to_bool(skip_upload)
        self.unit_system = unit_system
        self.augment_record = augment_record
        self.templates = dict()
        self.line_format = line_format

        if create_database:
            uname = None
            pword = None
            if dbadmin_username is not None:
                uname = dbadmin_username
                pword = dbadmin_password
            elif username is not None:
                uname = username
                pword = password
            self.create_database(uname, pword)
开发者ID:matthewwall,项目名称:weewx-influx,代码行数:48,代码来源:influx.py


示例18: __init__

 def __init__(
     self,
     queue,
     username,
     password,
     latitude,
     longitude,
     altitude,
     station_name,
     manager_dict,
     server_url=_SERVER_URL,
     skip_upload=False,
     post_interval=None,
     max_backlog=0,
     stale=None,
     log_success=True,
     log_failure=True,
     timeout=60,
     max_tries=3,
     retry_wait=5,
 ):
     super(OpenWeatherMapThread, self).__init__(
         queue,
         protocol_name="OWM",
         manager_dict=manager_dict,
         post_interval=post_interval,
         max_backlog=max_backlog,
         stale=stale,
         log_success=log_success,
         log_failure=log_failure,
         timeout=timeout,
         max_tries=max_tries,
         retry_wait=retry_wait,
     )
     self.username = username
     self.password = password
     self.latitude = float(latitude)
     self.longitude = float(longitude)
     self.altitude = float(altitude)
     self.station_name = station_name
     self.server_url = server_url
     self.skip_upload = to_bool(skip_upload)
开发者ID:hes19073,项目名称:hesweewx,代码行数:42,代码来源:owm.py


示例19: __init__

    def __init__(self, engine, config_dict):
        super(AS3935, self).__init__(engine, config_dict)
        loginf("service version is %s" % VERSION)
        svc_dict = config_dict.get("AS3935", {})
        addr = int(svc_dict.get("address", 0x03))
        bus = int(svc_dict.get("bus", 1))
        indoors = to_bool(svc_dict.get("indoors", True))
        noise_floor = int(svc_dict.get("noise_floor", 0))
        calib = int(svc_dict.get("calibration", 0x6))
        pin = int(svc_dict.get("pin", 17))
        self.binding = svc_dict.get("data_binding", None)

        self.data = []

        # if a binding was specified, then use it to save strikes to database
        if self.binding is not None:
            # configure the lightning database
            dbm_dict = weewx.manager.get_manager_dict(
                config_dict["DataBindings"],
                config_dict["Databases"],
                self.binding,
                default_binding_dict=get_default_binding_dict(),
            )
            with weewx.manager.open_manager(dbm_dict, initialize=True) as dbm:
                # ensure schema on disk matches schema in memory
                dbcol = dbm.connection.columnsOf(dbm.table_name)
                memcol = [x[0] for x in dbm_dict["schema"]]
                if dbcol != memcol:
                    raise Exception("as3935: schema mismatch: %s != %s" % (dbcol, memcol))

        # configure the gpio and sensor
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(pin, GPIO.IN)
        self.sensor = RPi_AS3935(address=addr, bus=bus)
        self.sensor.set_indoors(indoors)
        self.sensor.set_noise_floor(noise_floor)
        self.sensor.calibrate(tun_cap=calib)

        # add a gpio callback for the lightning strikes
        GPIO.add_event_detect(pin, GPIO.RISING, callback=self.handle_interrupt)
        # on each new archive record, read then clear data since last record
        self.bind(weewx.NEW_ARCHIVE_RECORD, self.read_data)
开发者ID:hes19073,项目名称:hesweewx,代码行数:42,代码来源:as3935.py


示例20: __init__

 def __init__(self, queue, id, key, manager_dict,
              server_url=_SERVER_URL, skip_upload=False,
              post_interval=600, max_backlog=sys.maxint, stale=None,
              log_success=True, log_failure=True,
              timeout=60, max_tries=3, retry_wait=5):
     super(WeatherCloudThread, self).__init__(queue,
                                            protocol_name='WeatherCloud',
                                            manager_dict=manager_dict,
                                            post_interval=post_interval,
                                            max_backlog=max_backlog,
                                            stale=stale,
                                            log_success=log_success,
                                            log_failure=log_failure,
                                            max_tries=max_tries,
                                            timeout=timeout,
                                            retry_wait=retry_wait)
     self.id = id
     self.key = key
     self.server_url = server_url
     self.skip_upload = to_bool(skip_upload)
开发者ID:hes19073,项目名称:hesweewx,代码行数:20,代码来源:wcloud.py



注:本文中的weeutil.weeutil.to_bool函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python weeutil.to_int函数代码示例发布时间:2022-05-26
下一篇:
Python weedb.connect函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap