• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python operating_system.read_file函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中trove.guestagent.common.operating_system.read_file函数的典型用法代码示例。如果您正苦于以下问题:Python read_file函数的具体用法?Python read_file怎么用?Python read_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了read_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _test_file_codec

    def _test_file_codec(self, data, read_codec, write_codec=None,
                         expected_data=None,
                         expected_exception=None,
                         reverse_encoding=False):
        write_codec = write_codec or read_codec

        with tempfile.NamedTemporaryFile() as test_file:
            encode = True
            decode = True
            if reverse_encoding:
                encode = False
                decode = False
            if expected_exception:
                with expected_exception:
                    operating_system.write_file(test_file.name, data,
                                                codec=write_codec,
                                                encode=encode)
                    operating_system.read_file(test_file.name,
                                               codec=read_codec,
                                               decode=decode)
            else:
                operating_system.write_file(test_file.name, data,
                                            codec=write_codec,
                                            encode=encode)
                read = operating_system.read_file(test_file.name,
                                                  codec=read_codec,
                                                  decode=decode)
                if expected_data is not None:
                    self.assertEqual(expected_data, read)
                else:
                    self.assertEqual(data, read)
开发者ID:Tesora,项目名称:tesora-trove,代码行数:31,代码来源:test_operating_system.py


示例2: test_read_write_file_input_validation

    def test_read_write_file_input_validation(self):
        with ExpectedException(exception.UnprocessableEntity, "File does not exist: None"):
            operating_system.read_file(None)

        with ExpectedException(exception.UnprocessableEntity, "File does not exist: /__DOES_NOT_EXIST__"):
            operating_system.read_file("/__DOES_NOT_EXIST__")

        with ExpectedException(exception.UnprocessableEntity, "Invalid path: None"):
            operating_system.write_file(None, {})
开发者ID:mmasaki,项目名称:trove,代码行数:9,代码来源:test_operating_system.py


示例3: get

    def get(self, revision):
        revisions = self._collect_revisions()
        if revisions:
            # Return the difference between this revision and the current base.
            this_revision = operating_system.read_file(revisions[revision - 1], codec=self._codec)
            current_base = operating_system.read_file(self._base_config_path, codec=self._codec)

            return guestagent_utils.dict_difference(this_revision, current_base)

        return {}
开发者ID:cretta,项目名称:trove,代码行数:10,代码来源:configuration.py


示例4: _test_file_codec

    def _test_file_codec(self, data, read_codec, write_codec=None, expected_data=None, expected_exception=None):
        write_codec = write_codec or read_codec

        with tempfile.NamedTemporaryFile() as test_file:
            if expected_exception:
                with expected_exception:
                    operating_system.write_file(test_file.name, data, codec=write_codec)
                    operating_system.read_file(test_file.name, codec=read_codec)
            else:
                operating_system.write_file(test_file.name, data, codec=write_codec)
                read = operating_system.read_file(test_file.name, codec=read_codec)
                if expected_data is not None:
                    self.assertEqual(expected_data, read)
                else:
                    self.assertEqual(data, read)
开发者ID:jjmob,项目名称:trove,代码行数:15,代码来源:test_operating_system.py


示例5: __init__

 def __init__(self):
     self._admin_pwd = None
     self._sys_pwd = None
     self._db_name = None
     self._db_unique_name = None
     self.codec = stream_codecs.IniCodec()
     if not os.path.isfile(self._CONF_FILE):
         operating_system.create_directory(os.path.dirname(self._CONF_FILE),
                                           as_root=True)
         section = {self._CONF_ORA_SEC: {}}
         operating_system.write_file(self._CONF_FILE, section,
                                     codec=self.codec,
                                     as_root=True)
     else:
         config = operating_system.read_file(self._CONF_FILE,
                                             codec=self.codec,
                                             as_root=True)
         try:
             if self._CONF_SYS_KEY in config[self._CONF_ORA_SEC]:
                 self._sys_pwd = config[self._CONF_ORA_SEC][self._CONF_SYS_KEY]
             if self._CONF_ADMIN_KEY in config[self._CONF_ORA_SEC]:
                 self._admin_pwd = config[self._CONF_ORA_SEC][self._CONF_ADMIN_KEY]
             if self._CONF_ROOT_ENABLED in config[self._CONF_ORA_SEC]:
                 self._root_enabled = config[self._CONF_ORA_SEC][self._CONF_ROOT_ENABLED]
             if self._CONF_DB_NAME in config[self._CONF_ORA_SEC]:
                 self._db_name = config[self._CONF_ORA_SEC][self._CONF_DB_NAME]
             if self._CONF_DB_UNIQUE_NAME in config[self._CONF_ORA_SEC]:
                 self._db_unique_name = config[self._CONF_ORA_SEC][self._CONF_DB_UNIQUE_NAME]
         except KeyError:
             # the ORACLE section does not exist, stop parsing
             pass
开发者ID:cdelatte,项目名称:tesora-trove,代码行数:31,代码来源:service.py


示例6: read_module_results

 def read_module_results(cls, is_admin=False, include_contents=False):
     """Read all the module results on the guest and return a list
     of them.
     """
     results = []
     pattern = cls.MODULE_RESULT_FILENAME
     result_files = operating_system.list_files_in_directory(
         cls.MODULE_BASE_DIR, recursive=True, pattern=pattern)
     for result_file in result_files:
         result = cls.read_module_result(result_file)
         if (not result.get('removed') and
                 (is_admin or result.get('visible'))):
             if include_contents:
                 codec = stream_codecs.Base64Codec()
                 # keep admin_only for backwards compatibility
                 if not is_admin and (result.get('is_admin') or
                                      result.get('admin_only')):
                     contents = (
                         "Must be admin to retrieve contents for module %s"
                         % result.get('name', 'Unknown'))
                     result['contents'] = codec.serialize(contents)
                 else:
                     contents_dir = os.path.dirname(result_file)
                     contents_file = cls.build_contents_filename(
                         contents_dir)
                     result['contents'] = operating_system.read_file(
                         contents_file, codec=codec, decode=False)
             results.append(result)
     results.sort(key=operator.itemgetter('updated'), reverse=True)
     return results
开发者ID:Tesora,项目名称:tesora-trove,代码行数:30,代码来源:module_manager.py


示例7: _read_log_position

 def _read_log_position(self):
     backup_var_file = ('%s/backup_variables.txt' %
                        MySqlApp.get_data_dir())
     if operating_system.exists(backup_var_file):
         try:
             LOG.info(_("Reading log position from %s") % backup_var_file)
             backup_vars = operating_system.read_file(
                 backup_var_file,
                 stream_codecs.PropertiesCodec(delimiter='='),
                 as_root=True)
             binlog_position = backup_vars['binlog_position']
             binlog_file, binlog_pos = binlog_position.split(':')
             return {
                 'log_file': binlog_file,
                 'log_position': int(binlog_pos)
             }
         except Exception as ex:
             LOG.exception(ex)
             raise self.UnableToDetermineBinlogPosition(
                 {'binlog_file': backup_var_file})
     else:
         LOG.info(_("Log position detail not available. "
                    "Using default values."))
         return {'log_file': '',
                 'log_position': 4}
开发者ID:Tesora,项目名称:tesora-trove,代码行数:25,代码来源:mysql_ee_binlog.py


示例8: _update_crsconfig_params_hostname

 def _update_crsconfig_params_hostname(self, hostname):
     filepath = path.join(GRID_HOME, 'crs', 'install', 'crsconfig_params')
     contents = re.sub(r'INSTALL_NODE=.*',
                       'INSTALL_NODE={hostname}'.format(hostname=hostname),
                       operating_system.read_file(filepath, as_root=True))
     self.write_oracle_user_file(
         filepath, contents, filemode=operating_system.FileMode.SET_FULL)
开发者ID:Tesora,项目名称:tesora-trove,代码行数:7,代码来源:service.py


示例9: prep_pfile_management

 def prep_pfile_management(self):
     """Generate the base PFILE from the original SPFILE,
     cleanse it of internal settings,
     create a backup spfile,
     and initialize the configuration manager to use it.
     """
     self.admin.create_pfile(target=self.paths.os_pfile, from_memory=True)
     parameters = operating_system.read_file(
         self.paths.os_pfile,
         codec=self.pfile_codec(),
         as_root=True)
     cleansed_parameters = dict()
     for k, v in parameters.items():
         if k.startswith('_'):
             continue
         if v.find('rdbms') != -1:
             continue
         cleansed_parameters[k] = v
     operating_system.write_file(
         self.paths.os_pfile,
         cleansed_parameters,
         codec=self.pfile_codec(),
         as_root=True)
     self.admin.create_spfile(target=self.paths.base_spfile,
                              source=self.paths.os_pfile)
     self._init_configuration_manager()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:26,代码来源:service.py


示例10: apply

 def apply(self, name, datastore, ds_version, data_file, admin_module):
     data = operating_system.read_file(
         data_file, codec=stream_codecs.KeyValueCodec())
     for key, value in data.items():
         if 'message' == key.lower():
             return True, value
     return False, 'Message not found in contents file'
开发者ID:Tesora,项目名称:tesora-trove,代码行数:7,代码来源:ping_driver.py


示例11: mount_storage

    def mount_storage(self, storage_info):
        fstab = path.join('/etc', 'fstab')
        default_mount_options = ('rw,bg,hard,nointr,tcp,vers=3,timeo=600,'
                                 'rsize=32768,wsize=32768,actimeo=0')
        data_mount_options = ('user,tcp,rsize=32768,wsize=32768,hard,intr,'
                              'noac,nfsvers=3')
        if storage_info['type'] == 'nfs':
            sources = storage_info['data']
            data = list()
            if operating_system.exists(fstab):
                data.append(operating_system.read_file(fstab, as_root=True))

            def _line(source, target, options=default_mount_options):
                data.append('{source} {target} nfs {options} 0 0'.format(
                    source=source, target=target, options=options))

            _line(sources['votedisk_mount'], SHARED_DISK_PATHS['votedisk'],)
            _line(sources['registry_mount'], SHARED_DISK_PATHS['registry'],)
            _line(sources['database_mount'], SHARED_DISK_PATHS['database'],
                  data_mount_options)
            operating_system.write_file(fstab, '\n'.join(data),
                                        as_root=True)
            utils.execute_with_timeout('mount', '-a',
                                       run_as_root=True,
                                       root_helper='sudo',
                                       timeout=service.ORACLE_TIMEOUT,
                                       log_output_on_error=True)
        else:
            raise exception.GuestError(_(
                "Storage type {t} not valid.").format(t=storage_info['type']))
开发者ID:Tesora,项目名称:tesora-trove,代码行数:30,代码来源:service.py


示例12: _load_current_superuser

 def _load_current_superuser(self):
     config = operating_system.read_file(self._get_cqlsh_conf_path(),
                                         codec=IniCodec())
     return models.CassandraUser(
         config[self._CONF_AUTH_SEC][self._CONF_USR_KEY],
         config[self._CONF_AUTH_SEC][self._CONF_PWD_KEY]
     )
开发者ID:gongwayne,项目名称:Openstack,代码行数:7,代码来源:service.py


示例13: apply

    def apply(self, group_name, change_id, options):
        self._initialize_import_directory()
        revision_file = self._find_revision_file(group_name, change_id)
        if revision_file is None:
            # Create a new file.
            last_revision_index = self._get_last_file_index(group_name)
            revision_file = guestagent_utils.build_file_path(
                self._revision_dir,
                '%s-%03d-%s' % (group_name, last_revision_index + 1,
                                change_id),
                self._revision_ext)
        else:
            # Update the existing file.
            current = operating_system.read_file(
                revision_file, codec=self._codec, as_root=self._requires_root)
            options = guestagent_utils.update_dict(options, current)

        operating_system.write_file(
            revision_file, options, codec=self._codec,
            as_root=self._requires_root)
        operating_system.chown(
            revision_file, self._owner, self._group,
            as_root=self._requires_root)
        operating_system.chmod(
            revision_file, FileMode.ADD_READ_ALL, as_root=self._requires_root)
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:25,代码来源:configuration.py


示例14: _rewind_against_master

    def _rewind_against_master(self, service):
        """Call pg_rewind to resync datadir against state of new master
        We should already have a recovery.conf file in PGDATA
        """
        rconf = operating_system.read_file(
            service.pgsql_recovery_config, codec=stream_codecs.KeyValueCodec(line_terminator="\n"), as_root=True
        )
        conninfo = rconf["primary_conninfo"].strip()

        # The recovery.conf file we want should already be there, but pg_rewind
        # will delete it, so copy it out first
        rec = service.pgsql_recovery_config
        tmprec = "/tmp/recovery.conf.bak"
        operating_system.move(rec, tmprec, as_root=True)

        cmd_full = " ".join(
            [
                "pg_rewind",
                "-D",
                service.pgsql_data_dir,
                "--source-pgdata=" + service.pgsql_data_dir,
                "--source-server=" + conninfo,
            ]
        )
        out, err = utils.execute("sudo", "su", "-", service.pgsql_owner, "-c", "%s" % cmd_full, check_exit_code=0)
        LOG.debug("Got stdout %s and stderr %s from pg_rewind" % (str(out), str(err)))

        operating_system.move(tmprec, rec, as_root=True)
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:28,代码来源:postgresql_impl.py


示例15: _get_or_create_replication_user

    def _get_or_create_replication_user(self, service):
        """There are three scenarios we need to deal with here:
        - This is a fresh master, with no replicator user created.
           Generate a new u/p
        - We are attaching a new slave and need to give it the login creds
           Send the creds we have stored in PGDATA/.replpass
        - This is a failed-over-to slave, who will have the replicator user
           but not the credentials file. Recreate the repl user in this case
        """

        LOG.debug("Checking for replicator user")
        pwfile = os.path.join(service.pgsql_data_dir, ".replpass")
        admin = service.build_admin()
        if admin.user_exists(REPL_USER):
            if operating_system.exists(pwfile, as_root=True):
                LOG.debug("Found existing .replpass, returning pw")
                pw = operating_system.read_file(pwfile, as_root=True)
            else:
                LOG.debug("Found user but not .replpass, recreate")
                u = models.PostgreSQLUser(REPL_USER)
                admin._drop_user(context=None, user=u)
                pw = self._create_replication_user(service, admin, pwfile)
        else:
            LOG.debug("Found no replicator user, create one")
            pw = self._create_replication_user(service, admin, pwfile)

        repl_user_info = {"name": REPL_USER, "password": pw}

        return repl_user_info
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:29,代码来源:postgresql_impl.py


示例16: parse_updates

    def parse_updates(self):
        parsed_options = {}
        for path in self._collect_revisions():
            options = operating_system.read_file(path, codec=self._codec)
            guestagent_utils.update_dict(options, parsed_options)

        return parsed_options
开发者ID:cretta,项目名称:trove,代码行数:7,代码来源:configuration.py


示例17: _save_value_in_file

 def _save_value_in_file(self, option, value):
     config = operating_system.read_file(
         self.file_path, codec=self._codec, as_root=True)
     name = self.key_names[option]
     config[self.section_name][name] = value
     operating_system.write_file(
         self.file_path, config, codec=self._codec, as_root=True)
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:7,代码来源:service.py


示例18: _get_actual_db_status

 def _get_actual_db_status(self):
     if os.path.exists(CONF.get(MANAGER).oracle_ra_status_file):
         status = operating_system.read_file(
             CONF.get(MANAGER).oracle_ra_status_file, as_root=True)
         if status.startswith('OK'):
             return rd_instance.ServiceStatuses.RUNNING
         elif status.startswith('ERROR'):
             return rd_instance.ServiceStatuses.UNKNOWN
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:8,代码来源:service.py


示例19: _assert_import_overrides

 def _assert_import_overrides(self, strategy, group_name, overrides, path_builder):
     # Check all override files and their contents,
     for change_id, _, index, expected in overrides:
         expected_path = path_builder(strategy._revision_dir, group_name, change_id, index, strategy._revision_ext)
         self._assert_file_exists(expected_path, True)
         # Assert that the file contents.
         imported = operating_system.read_file(expected_path, codec=strategy._codec)
         self.assertEqual(expected, imported)
开发者ID:jjmob,项目名称:trove,代码行数:8,代码来源:test_configuration.py


示例20: _create_oratab_entry

 def _create_oratab_entry(self):
     """Create in the /etc/oratab file entries for the databases being
     restored"""
     file_content = operating_system.read_file(ORATAB_PATH)
     file_content += ("\n%(db_name)s:%(ora_home)s:N\n" %
                      {'db_name': self.db_name, 'ora_home': ORACLE_HOME})
     operating_system.write_file(ORATAB_PATH, file_content, as_root=True)
     operating_system.chown(ORATAB_PATH, 'oracle', 'oinstall',
                            recursive=True, force=True, as_root=True)
开发者ID:cdelatte,项目名称:tesora-trove,代码行数:9,代码来源:oracle_impl.py



注:本文中的trove.guestagent.common.operating_system.read_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python operating_system.remove函数代码示例发布时间:2022-05-27
下一篇:
Python operating_system.move函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap