• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python guestagent_utils.update_dict函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中trove.guestagent.common.guestagent_utils.update_dict函数的典型用法代码示例。如果您正苦于以下问题:Python update_dict函数的具体用法?Python update_dict怎么用?Python update_dict使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了update_dict函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_updates

    def parse_updates(self):
        parsed_options = {}
        for path in self._collect_revisions():
            options = operating_system.read_file(path, codec=self._codec)
            guestagent_utils.update_dict(options, parsed_options)

        return parsed_options
开发者ID:cretta,项目名称:trove,代码行数:7,代码来源:configuration.py


示例2: update_acl

 def update_acl(username, keyspace, permission, acl):
     permissions = acl.get(username, {}).get(keyspace)
     if permissions is None:
         guestagent_utils.update_dict({user: {keyspace: {permission}}},
                                      acl)
     else:
         permissions.add(permission)
开发者ID:gongwayne,项目名称:Openstack,代码行数:7,代码来源:service.py


示例3: _configure_network

    def _configure_network(self, port=None):
        """Make the service accessible at a given (or default if not) port.
        """
        instance_ip = netutils.get_my_ipv4()
        bind_interfaces_string = ','.join([instance_ip, '127.0.0.1'])
        options = {'net.bindIp': bind_interfaces_string}
        if port is not None:
            guestagent_utils.update_dict({'net.port': port}, options)

        self.configuration_manager.apply_system_override(options)
        self.status.set_host(instance_ip, port=port)
开发者ID:Hopebaytech,项目名称:trove,代码行数:11,代码来源:service.py


示例4: apply_next

 def apply_next(self, options):
     revision_num = self.count_revisions() + 1
     old_revision_backup = guestagent_utils.build_file_path(
         self._revision_backup_dir, self._base_config_name, str(revision_num), self._BACKUP_EXT
     )
     operating_system.copy(
         self._base_config_path, old_revision_backup, force=True, preserve=True, as_root=self._requires_root
     )
     current = operating_system.read_file(self._base_config_path, codec=self._codec)
     guestagent_utils.update_dict(options, current)
     operating_system.write_file(self._base_config_path, current, codec=self._codec, as_root=self._requires_root)
     operating_system.chown(self._base_config_path, self._owner, self._group, as_root=self._requires_root)
     operating_system.chmod(self._base_config_path, FileMode.ADD_READ_ALL, as_root=self._requires_root)
开发者ID:cretta,项目名称:trove,代码行数:13,代码来源:configuration.py


示例5: parse_configuration

    def parse_configuration(self):
        """Read contents of the configuration file (applying overrides if any)
        and parse it into a dict.

        :returns:        Configuration file as a Python dict.
        """

        base_options = operating_system.read_file(self._base_config_path, codec=self._codec)

        if self._override_strategy:
            updates = self._override_strategy.parse_updates()
            guestagent_utils.update_dict(updates, base_options)

        return base_options
开发者ID:cretta,项目名称:trove,代码行数:14,代码来源:configuration.py


示例6: test_ini_file_codec

    def test_ini_file_codec(self):
        data_no_none = {"Section1": {"s1k1": 's1v1',
                                     "s1k2": '3.1415926535'},
                        "Section2": {"s2k1": '1',
                                     "s2k2": 'True'}}

        self._test_file_codec(data_no_none, IniCodec())

        data_with_none = {"Section1": {"s1k1": 's1v1',
                                       "s1k2": '3.1415926535'},
                          "Section2": {"s2k1": '1',
                                       "s2k2": 'True',
                                       "s2k3": None}}

        # Keys with None values will be written without value.
        self._test_file_codec(data_with_none, IniCodec())

        # Non-string values will be converted to strings.
        data_with_none_as_objects = {"Section1": {"s1k1": 's1v1',
                                                  "s1k2": 3.1415926535},
                                     "Section2": {"s2k1": 1,
                                                  "s2k2": True,
                                                  "s2k3": None}}
        self._test_file_codec(data_with_none_as_objects, IniCodec(),
                              expected_data=data_with_none)

        # None will be replaced with 'default_value'.
        default_value = '1'
        expected_data = guestagent_utils.update_dict(
            {"Section2": {"s2k3": default_value}}, dict(data_with_none))
        self._test_file_codec(data_with_none,
                              IniCodec(default_value=default_value),
                              expected_data=expected_data)
开发者ID:pombredanne,项目名称:trove,代码行数:33,代码来源:test_operating_system.py


示例7: apply

    def apply(self, group_name, change_id, options):
        self._initialize_import_directory()
        revision_file = self._find_revision_file(group_name, change_id)
        if revision_file is None:
            # Create a new file.
            last_revision_index = self._get_last_file_index(group_name)
            revision_file = guestagent_utils.build_file_path(
                self._revision_dir,
                '%s-%03d-%s' % (group_name, last_revision_index + 1,
                                change_id),
                self._revision_ext)
        else:
            # Update the existing file.
            current = operating_system.read_file(
                revision_file, codec=self._codec, as_root=self._requires_root)
            options = guestagent_utils.update_dict(options, current)

        operating_system.write_file(
            revision_file, options, codec=self._codec,
            as_root=self._requires_root)
        operating_system.chown(
            revision_file, self._owner, self._group,
            as_root=self._requires_root)
        operating_system.chmod(
            revision_file, FileMode.ADD_READ_ALL, as_root=self._requires_root)
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:25,代码来源:configuration.py


示例8: update_configuration

    def update_configuration(self, options):
        """Update given options in the configuration.

        The updates are stored in a 'system' revision if the manager
        supports configuration overrides. Otherwise they get applied
        directly to the base configuration file.

        The 'system' revision is always applied last to ensure it
        overrides any user-specified configuration changes.
        """
        if self._override_strategy:
            # Update the system overrides.
            system_overrides = self._override_strategy.get(self._current_revision + 1)
            guestagent_utils.update_dict(options, system_overrides)
            # Re-apply the updated system overrides.
            self._override_strategy.remove_last(1)
            self._override_strategy.apply_next(system_overrides)
        else:
            # Update the base configuration file.
            config = self.parse_configuration()
            guestagent_utils.update_dict(options, config)
            self.render_configuration(config)
开发者ID:cretta,项目名称:trove,代码行数:22,代码来源:configuration.py


示例9: test_update_dict

    def test_update_dict(self):
        self.assertEqual({}, guestagent_utils.update_dict({}, {}))
        self.assertEqual({'key': 'value'},
                         guestagent_utils.update_dict({}, {'key': 'value'}))
        self.assertEqual({'key': 'value'},
                         guestagent_utils.update_dict({'key': 'value'}, {}))

        data = {'rpc_address': "0.0.0.0",
                'broadcast_rpc_address': '0.0.0.0',
                'listen_address': '0.0.0.0',
                'seed_provider': [{
                    'class_name':
                    'org.apache.cassandra.locator.SimpleSeedProvider',
                    'parameters': [{'seeds': '0.0.0.0'}]}]
                }

        updates = {'rpc_address': "127.0.0.1",
                   'seed_provider': {'parameters':
                                     {'seeds': '127.0.0.1'}
                                     }
                   }

        updated = guestagent_utils.update_dict(updates, data)

        expected = {'rpc_address': "127.0.0.1",
                    'broadcast_rpc_address': '0.0.0.0',
                    'listen_address': '0.0.0.0',
                    'seed_provider': [{
                        'class_name':
                        'org.apache.cassandra.locator.SimpleSeedProvider',
                        'parameters': [{'seeds': '127.0.0.1'}]}]
                    }

        self.assertEqual(expected, updated)

        updates = {'seed_provider':
                   [{'class_name':
                     'org.apache.cassandra.locator.SimpleSeedProvider'
                     }]
                   }

        updated = guestagent_utils.update_dict(updates, data)

        expected = {'rpc_address': "127.0.0.1",
                    'broadcast_rpc_address': '0.0.0.0',
                    'listen_address': '0.0.0.0',
                    'seed_provider': [{
                        'class_name':
                        'org.apache.cassandra.locator.SimpleSeedProvider'}]
                    }

        self.assertEqual(expected, updated)

        data = {'timeout': 0, 'save': [[900, 1], [300, 10]]}
        updates = {'save': [[900, 1], [300, 10], [60, 10000]]}
        updated = guestagent_utils.update_dict(updates, data)
        expected = {'timeout': 0, 'save': [[900, 1], [300, 10], [60, 10000]]}

        self.assertEqual(expected, updated)
开发者ID:zn-share,项目名称:trove,代码行数:59,代码来源:test_guestagent_utils.py


示例10: _regenerate_base_configuration

    def _regenerate_base_configuration(self):
        """Gather all configuration changes and apply them in order on the base
        revision. Write the results to the configuration file.
        """

        if not os.path.exists(self._base_revision_file):
            # Initialize the file with the current configuration contents if it
            # does not exist.
            operating_system.copy(
                self._base_config_path, self._base_revision_file,
                force=True, preserve=True, as_root=self._requires_root)

        base_revision = operating_system.read_file(
            self._base_revision_file, codec=self._codec)
        changes = self._import_strategy.parse_updates()
        updated_revision = guestagent_utils.update_dict(changes, base_revision)
        operating_system.write_file(
            self._base_config_path, updated_revision, codec=self._codec,
            as_root=self._requires_root)
开发者ID:cp16net,项目名称:trove,代码行数:19,代码来源:configuration.py


示例11: test_update_dict

 def test_update_dict(self):
     data = [{
         'dict': {}, 'update': {}, 'expected': {},
     }, {
         'dict': None, 'update': {}, 'expected': {},
     }, {
         'dict': {}, 'update': None, 'expected': {},
     }, {
         'dict': {}, 'update': None, 'expected': {},
     }, {
         'dict': None, 'update': {'name': 'Tom'},
         'expected': {'name': 'Tom'},
     }, {
         'dict': {}, 'update': {'name': 'Tom'},
         'expected': {'name': 'Tom'},
     }, {
         'dict': {'name': 'Tom'}, 'update': {},
         'expected': {'name': 'Tom'},
     }, {
         'dict': {'key1': 'value1',
                  'dict1': {'key1': 'value1', 'key2': 'value2'}},
         'update': {'key1': 'value1+',
                    'key2': 'value2',
                    'dict1': {'key3': 'value3'}},
         'expected': {'key1': 'value1+',
                      'key2': 'value2',
                      'dict1': {'key1': 'value1', 'key2': 'value2',
                                'key3': 'value3'}},
     }, {
         'dict': {'d1': {'d2': {'d3': {'k1': 'v1'}}}},
         'update': {'d1': {'d2': {'d3': {'k2': 'v2'}}}},
         'expected': {'d1': {'d2': {'d3': {'k1': 'v1', 'k2': 'v2'}}}},
     }, {
         'dict': {'timeout': 0, 'save': [[900, 1], [300, 10]]},
         'update': {'save': [[300, 20], [60, 10000]]},
         'expected': {'timeout': 0,
                      'save': [[300, 20], [60, 10000]]},
     }, {
         'dict': {'rpc_address': '0.0.0.0',
                  'broadcast_rpc_address': '0.0.0.0',
                  'listen_address': '0.0.0.0',
                  'seed_provider': [{
                      'class_name':
                      'org.apache.cassandra.locator.SimpleSeedProvider',
                      'parameters': [{'seeds': '0.0.0.0'}]}]
                  },
         'update': {'rpc_address': '127.0.0.1',
                    'seed_provider': {'parameters': {
                        'seeds': '127.0.0.1'}}
                    },
         'expected': {'rpc_address': '127.0.0.1',
                      'broadcast_rpc_address': '0.0.0.0',
                      'listen_address': '0.0.0.0',
                      'seed_provider': [{
                          'class_name':
                          'org.apache.cassandra.locator.SimpleSeedProvider',
                          'parameters': [{'seeds': '127.0.0.1'}]}]
                      },
     }, {
         'dict': {'rpc_address': '127.0.0.1',
                  'broadcast_rpc_address': '0.0.0.0',
                  'listen_address': '0.0.0.0',
                  'seed_provider': [{
                      'class_name':
                      'org.apache.cassandra.locator.SimpleSeedProvider',
                      'parameters': [{'seeds': '0.0.0.0'}]}]
                  },
         'update': {'seed_provider':
                    [{'class_name':
                      'org.apache.cassandra.locator.SimpleSeedProvider'}]
                    },
         'expected': {'rpc_address': '127.0.0.1',
                      'broadcast_rpc_address': '0.0.0.0',
                      'listen_address': '0.0.0.0',
                      'seed_provider': [{
                          'class_name':
                          'org.apache.cassandra.locator.SimpleSeedProvider'
                      }]},
     }]
     count = 0
     for record in data:
         count += 1
         target = record['dict']
         update = record['update']
         expected = record['expected']
         result = guestagent_utils.update_dict(update, target)
         msg = 'Unexpected result for test %s' % str(count)
         self.assertEqual(expected, result, msg)
开发者ID:512868516,项目名称:trove,代码行数:88,代码来源:test_guestagent_utils.py



注:本文中的trove.guestagent.common.guestagent_utils.update_dict函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python operating_system.chmod函数代码示例发布时间:2022-05-27
下一篇:
Python guestagent_utils.build_file_path函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap