• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python containers.DotDict类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyon.util.containers.DotDict的典型用法代码示例。如果您正苦于以下问题:Python DotDict类的具体用法?Python DotDict怎么用?Python DotDict使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了DotDict类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: ChatServerService

class ChatServerService(BaseChatsService):
    def on_init(self):
        print "INIT CHAT SERVER"
        self.clients = DotDict()

    def register(self, user_name='', proc_id=''):
        print "Registering user %s, client %s" % (user_name, proc_id)
        client = ProcessRPCClient(node=self.container.node, name=proc_id, iface=IChatcService, process=self)
        self.clients[user_name] = DotDict(procid=proc_id, user_name=user_name, client=client)
        return "OK"

    def unregister(self, user_name=''):
        log.debug("Unregistering client %s" % proc_id)
        del self.clients[user_name]
        return "OK"

    def message(self, from_name='', to_name='', text=''):
        if to_name == "all":
            for cl in self.clients.values():
                cl['client'].message(from_name=from_name, text=text)
        else:
            client = self.clients.get(to_name, None)
            if client:
                client.client.message(from_name=from_name, text=text)
            else:
                return "USER NOT FOUND"
        return "OK"

    def list_users(self):
        return str(self.clients.keys())
开发者ID:dstuebe,项目名称:coi-services,代码行数:30,代码来源:chats_service.py


示例2: test_create_schedule

    def test_create_schedule(self):

        proc_def = DotDict()
        proc_def['name'] = "someprocess"
        proc_def['executable'] = {'module': 'my_module', 'class': 'class', 'url': 'myurl'}
        mock_read_definition = Mock()
        mock_read_definition.return_value = proc_def
        self.pd_service.backend.read_definition = mock_read_definition

        pid = self.pd_service.create_process("fake-process-def-id")

        proc_schedule = DotDict()
        proc_schedule['target'] = DotDict()
        proc_schedule.target['constraints'] = {"hats": 4}
        proc_schedule.target['node_exclusive'] = None
        proc_schedule.target['execution_engine_id'] = None

        configuration = {"some": "value"}

        pid2 = self.pd_service.schedule_process("fake-process-def-id",
            proc_schedule, configuration, pid)

        self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)
        self.assertEqual(pid, pid2)
        self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)

        self.assertEqual(self.mock_core.schedule_process.call_count, 1)
开发者ID:kerfoot,项目名称:coi-services,代码行数:27,代码来源:test_process_dispatcher.py


示例3: setUp

    def setUp(self):
        super(DiscoveryIntTest, self).setUp()

        self._start_container()
        self.addCleanup(DiscoveryIntTest.es_cleanup)
        self.container.start_rel_from_url('res/deploy/r2dm.yml')

        self.discovery = DiscoveryServiceClient()
        self.catalog   = CatalogManagementServiceClient()
        self.ims       = IndexManagementServiceClient()
        self.rr        = ResourceRegistryServiceClient()

        if use_es:
            self.es_host   = CFG.get_safe('server.elasticsearch.host', 'localhost')
            self.es_port   = CFG.get_safe('server.elasticsearch.port', '9200')
            CFG.server.elasticsearch.shards         = 1
            CFG.server.elasticsearch.replicas       = 0
            CFG.server.elasticsearch.river_shards   = 1
            CFG.server.elasticsearch.river_replicas = 0
            self.es = ep.ElasticSearch(
                host=self.es_host,
                port=self.es_port,
                timeout=10,
                verbose=True
            )
            op = DotDict(CFG)
            op.op = 'clean_bootstrap'
            self.container.spawn_process('index_bootstrap','ion.processes.bootstrap.index_bootstrap','IndexBootStrap', op)
开发者ID:ooici-eoi,项目名称:coi-services,代码行数:28,代码来源:discovery_test.py


示例4: test_dot_dict_constant

    def test_dot_dict_constant(self):
        d = DotDict({"foo": "bar"})
        self.assertEqual("bar", d.foo)
        d.foo = "somethingnew"
        self.assertEqual("somethingnew", d.foo)

        # DotDict only checks that an assignment operation is happening when it creates dummy entries
        # ... it doesn't check that the dummy entry is on the left hand side of the assignment
        k = d.foo1
        self.assertIn("foo1", dir(d))

        d.lock()

        # test assigning directly to a locked dict
        with self.assertRaises(AttributeError):
            d.foo = "somethingelse"
        self.assertEqual("somethingnew", d.foo)

        # test dummy-creation-on-assignment loophole
        with self.assertRaises(AttributeError):
            k = d.foo2
        self.assertNotIn("foo2", dir(d))

        # test alternate dummy creation method: calling a function with it
        with self.assertRaises(AttributeError):
            k = lambda _: True
            k(d.foo3)
        self.assertNotIn("foo3", dir(d))

        self.assertNotIn(DICT_LOCKING_ATTR, dir(d))
开发者ID:ateranishi,项目名称:pyon,代码行数:30,代码来源:test_containers.py


示例5: start_app

    def start_app(self, appdef=None, config=None):
        """
        @brief Start an app from an app definition.
        Note: apps can come in one of 2 variants:
        1 processapp: In-line defined process to be started
        2 regular app: Full app definition
        """
        log.debug("AppManager.start_app(appdef=%s) ..." % appdef)

        appdef = DotDict(appdef)
        app_config = DictModifier(CFG)

        if 'config' in appdef:
            # Apply config from app file
            app_file_cfg = DotDict(appdef.config)
            app_config.update(app_file_cfg)

        if config:
            # Nest dict modifier and apply config from rel file
            app_config = DictModifier(app_config, config)

        if 'processapp' in appdef:
            # Case 1: Appdef contains definition of process to start
            name, module, cls = appdef.processapp
            try:
                pid = self.container.spawn_process(name, module, cls, app_config)
                appdef._pid = pid
                self.apps.append(appdef)
            except Exception, ex:
                log.exception("Appl %s start from processapp failed" % appdef.name)
开发者ID:blazetopher,项目名称:pyon,代码行数:30,代码来源:apps.py


示例6: start_app

    def start_app(self, appdef=None, config=None):
        """
        @brief Start an app from an app definition.
        Note: apps can come in one of 2 variants:
        1 processapp: In-line defined process to be started
        2 regular app: Full app definition
        """
        log.debug("AppManager.start_app(appdef=%s) ..." % appdef)

        appdef = DotDict(appdef)

        if 'config' in appdef:
            app_cfg = appdef.config.copy()
            if config:
                dict_merge(app_cfg, config, inplace=True)
            config = app_cfg

        if 'processapp' in appdef:
            # Case 1: Appdef contains definition of process to start
            name, module, cls = appdef.processapp
            try:
                pid = self.container.spawn_process(name, module, cls, config)
                appdef._pid = pid
                self.apps.append(appdef)
            except Exception, ex:
                log.exception("Appl %s start from processapp failed" % appdef.name)
开发者ID:swarbhanu,项目名称:pyon,代码行数:26,代码来源:apps.py


示例7: create_data_process_logger

    def create_data_process_logger(self, data_product_id, clone_id, argument_map):
        '''
        Launches a data process that just prints input
        '''
        out_name = argument_map.values()[0]

        # Make the transfofm function
        tf_obj = IonObject(RT.TransformFunction,
                           name='stream_logger',
                           description='',
                           function='stream_logger',
                           module='ion.services.sa.test.test_data_process_functions',
                           arguments=['x'],
                           function_type=TransformFunctionType.TRANSFORM)
        func_id = self.data_process_management.create_transform_function(tf_obj)
        self.addCleanup(self.data_process_management.delete_transform_function, func_id)
        
        # Make the data process definition
        dpd_obj = IonObject(RT.DataProcessDefinition,
                            name='stream_logger',
                            description='logs some stream stuff',
                            data_process_type=DataProcessTypeEnum.RETRIEVE_PROCESS)
        configuration = DotDict()
        configuration.publish_limit = 40
        dpd_id = self.data_process_management.create_data_process_definition(dpd_obj, func_id)
        data_process_id = self.data_process_management.create_data_process(
                            data_process_definition_id=dpd_id, 
                            inputs=[data_product_id], 
                            outputs=[clone_id], 
                            configuration=configuration,
                            argument_map=argument_map, 
                            out_param_name=out_name) 
        return data_process_id
开发者ID:ednad,项目名称:coi-services,代码行数:33,代码来源:test_data_process_functions.py


示例8: load_data_process

    def load_data_process(self, stream_id=""):

        dpms_client = DataProcessManagementServiceClient()

        dataprocess_details = dpms_client.read_data_process_for_stream(stream_id)
        dataprocess_details = DotDict(dataprocess_details or {})
        dataprocess_id = dataprocess_details.dataprocess_id

        #set metrics attributes
        dataprocess_details.granule_counter = 0

        self._dataprocesses[dataprocess_id] = dataprocess_details

        #add the stream id to the map
        if 'in_stream_id' in dataprocess_details:
            if dataprocess_details['in_stream_id'] in self._streamid_map:
                (self._streamid_map[ dataprocess_details['in_stream_id'] ]).append(dataprocess_id)
            else:
                self._streamid_map[ dataprocess_details['in_stream_id'] ]  = [dataprocess_id]
        #todo: add transform worker id
        self.event_publisher.publish_event(origin=dataprocess_id, origin_type='DataProcess', status=DataProcessStatusType.NORMAL,
                                           description='data process loaded into transform worker')

        #create a publisher for output stream
        self.create_publisher(dataprocess_id, dataprocess_details)

        return [dataprocess_id]
开发者ID:pkediyal,项目名称:coi-services,代码行数:27,代码来源:transform_worker.py


示例9: _launch_transform

    def _launch_transform(self, name_of_transform = '', data_proc_def_id = None, input_dpod_id = None, output_dpod_id = None):

        # We need the key name here to be "L2_stream", since when the data process is launched, this name goes into
        # the config as in config.process.publish_streams.L2_stream when the config is used to launch the data process

        if name_of_transform in ['L0', 'L1']:
            binding = '%s_stream' % name_of_transform
        elif name_of_transform == 'L2_salinity':
            binding = 'salinity'
        elif name_of_transform == 'L2_density':
            binding = 'density'

        output_products = {binding : output_dpod_id}

        config = None
        if name_of_transform == 'L1':
            config = self._create_calibration_coefficients_dict()
        elif name_of_transform == 'L2_density':
            config = DotDict()
            config.process = {'lat' : 32.7153, 'lon' : 117.1564}

        data_proc_id = self.data_process_management.create_data_process( data_proc_def_id, [input_dpod_id], output_products, config)
        self.addCleanup(self.data_process_management.delete_data_process, data_proc_id)

        self.data_process_management.activate_data_process(data_proc_id)
        self.addCleanup(self.data_process_management.deactivate_data_process, data_proc_id)

        log.debug("Created a data process for ctdbp %s transform: id = %s", name_of_transform, data_proc_id)

        return data_proc_id
开发者ID:mbarry02,项目名称:coi-services,代码行数:30,代码来源:test_ctdbp_chain_L0_L1_L2.py


示例10: preload

 def preload(self):
     config = DotDict()
     config.op = 'load'
     config.scenario='BETA'
     config.categories='Parser,Reference'
     config.path='master'
     self.container.spawn_process('ion_loader', 'ion.processes.bootstrap.ion_loader','IONLoader',config)
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:7,代码来源:test_qc_functions.py


示例11: test_get_last_granule

    def test_get_last_granule(self, mock_bb, dsm_cli, dsm):

        mock_bb().sync_rdt_with_coverage = Mock()
        mock_bb().to_granule.return_value = {'test':True}

        dsm_cli().read_dataset = Mock()
        dataset = DotDict()
        dataset.datastore_name = 'test'
        dataset.view_name = 'bogus/view'

        dsm._get_coverage = Mock()
        dsm._get_coverage.return_value = {}
        
        datastore = DotDict()
        datastore.query_view = Mock()
        datastore.query_view.return_value = [{'doc':{'ts_create':0}}]

        
        container = DotDict()
        container.datastore_manager.get_datastore = Mock()
        container.datastore_manager.get_datastore.return_value = datastore

        retval = self.replay.get_last_granule(container,'dataset_id')

        self.assertEquals(retval,{'test':True})
开发者ID:pombredanne,项目名称:coi-services,代码行数:25,代码来源:replay_process_test.py


示例12: cache_resources

    def cache_resources(self, resource_type, specific_ids=None):
        """
        Save all resources of a given type to memory, for in-memory lookup ops

        This is a PREFETCH operation, and EnhancedResourceRegistryClient objects that use the cache functionality
        should NOT be kept across service calls.
        """
        #log.info("Caching resources: %s", resource_type)
        #log.debug("This cache is %s", self)
        time_caching_start = get_ion_ts()

        resource_objs = []
        if specific_ids is None:
            resource_objs, _ = self.RR.find_resources(restype=resource_type, id_only=False)
        else:
            assert type(specific_ids) is list
            if specific_ids:
                resource_objs = self.RR.read_mult(specific_ids)

        lookups = DotDict()
        lookups.by_id =   {}
        lookups.by_name = {}
        self._cached_resources[resource_type] = lookups

        for r in resource_objs:
            self._add_resource_to_cache(resource_type, r)

        time_caching_stop = get_ion_ts()

        total_time = int(time_caching_stop) - int(time_caching_start)
开发者ID:MatthewArrott,项目名称:coi-services,代码行数:30,代码来源:enhanced_resource_registry_client.py


示例13: test_dot_dict

 def test_dot_dict(self):
     dotDict = DotDict({"foo": {"bar": {"bah": "fah"}}})
     val = dotDict.foo.bar.bah
     self.assertEqual(val, "fah")
     dotDict.a = "1"
     self.assertEqual(dotDict.a, "1")
     self.assertTrue("a" in dotDict)
开发者ID:pkediyal,项目名称:pyon,代码行数:7,代码来源:test_containers.py


示例14: preload_ui

 def preload_ui(self):
     config = DotDict()
     config.op='loadui'
     config.loadui=True
     config.attachments='res/preload/r2_ioc/attachments'
     config.ui_path = "http://userexperience.oceanobservatories.org/database-exports/Candidates"
     
     self.container.spawn_process('preloader', 'ion.processes.bootstrap.ion_loader', 'IONLoader', config)
开发者ID:ateranishi,项目名称:coi-services,代码行数:8,代码来源:test_dm_extended.py


示例15: _create_instrument_config_builder

    def _create_instrument_config_builder(self):
        clients = DotDict()
        clients.resource_registry  = self.RR
        clients.pubsub_management  = self.PSC
        clients.dataset_management = self.DSC
        iconfig_builder = InstrumentAgentConfigurationBuilder(clients)

        return iconfig_builder
开发者ID:mbarry02,项目名称:coi-services,代码行数:8,代码来源:base_test_platform_agent_with_rsn.py


示例16: test_is_persisted

    def test_is_persisted(self):
        stream = DotDict()
        stream.persisted = True
        self.pubsub_read.return_value = stream

        retval = self.ingestion_management.is_persisted("stream_id")

        self.assertEquals(retval, True)
开发者ID:pombredanne,项目名称:coi-services,代码行数:8,代码来源:ingestion_management_test.py


示例17: lc_preload

 def lc_preload(self):
     config = DotDict()
     config.op = 'load'
     config.scenario = 'BASE,LC_TEST'
     config.categories = 'ParameterFunctions,ParameterDefs,ParameterDictionary'
     config.path = 'res/preload/r2_ioc'
     
     self.container.spawn_process('preload','ion.processes.bootstrap.ion_loader','IONLoader', config)
开发者ID:mbarry02,项目名称:coi-services,代码行数:8,代码来源:test_int_data_process_management_service.py


示例18: test_bridge_create_schedule

    def test_bridge_create_schedule(self):
        pdcfg = dict(uri="amqp://hello", topic="pd", exchange="123")
        self.pd_service.CFG = DotDict()
        self.pd_service.CFG['process_dispatcher_bridge'] = pdcfg
        self.pd_service.init()
        self.assertIsInstance(self.pd_service.backend, PDBridgeBackend)

        event_pub = Mock()
        self.pd_service.backend.event_pub = event_pub

        # sneak in and replace dashi connection method
        mock_dashi = Mock()
        mock_dashi.consume.return_value = lambda : None
        self.pd_service.backend._init_dashi = lambda : mock_dashi

        self.pd_service.start()
        self.assertEqual(mock_dashi.handle.call_count, 1)

        proc_def = DotDict()
        proc_def['name'] = "someprocess"
        proc_def['executable'] = {'module':'my_module', 'class':'class'}
        self.mock_rr_read.return_value = proc_def

        pid = self.pd_service.create_process("fake-process-def-id")

        proc_schedule = DotDict()
        proc_schedule['target'] = DotDict()
        proc_schedule.target['constraints'] = {"hats" : 4}

        configuration = {"some": "value"}

        pid2 = self.pd_service.schedule_process("fake-process-def-id",
            proc_schedule, configuration, pid)

        self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)
        self.assertEqual(pid, pid2)
        self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)
        self.assertEqual(mock_dashi.call.call_count, 1)
        call_args, call_kwargs = mock_dashi.call.call_args
        self.assertEqual(set(call_kwargs),
            set(['upid', 'spec', 'subscribers', 'constraints']))
        self.assertEqual(call_kwargs['constraints'],
            proc_schedule.target['constraints'])
        self.assertEqual(call_kwargs['subscribers'],
            self.pd_service.backend.pd_process_subscribers)
        self.assertEqual(call_args, ("pd", "dispatch_process"))
        self.assertEqual(event_pub.publish_event.call_count, 0)

        # trigger some fake async state updates from dashi. first
        # should not trigger an event

        self.pd_service.backend._process_state(dict(upid=pid,
            state="400-PENDING"))
        self.assertEqual(event_pub.publish_event.call_count, 0)

        self.pd_service.backend._process_state(dict(upid=pid,
            state="500-RUNNING"))
        self.assertEqual(event_pub.publish_event.call_count, 1)
开发者ID:dstuebe,项目名称:coi-services,代码行数:58,代码来源:test_process_dispatcher.py


示例19: test_query_request_collection

    def test_query_request_collection(self):
        query = DotDict()
        query.collection = 'test'

        self.discovery.query_collection = Mock()
        self.discovery.query_collection.return_value = 'test'

        retval = self.discovery.query_request(query)
        self.assertTrue(retval == 'test')
开发者ID:blazetopher,项目名称:coi-services,代码行数:9,代码来源:discovery_test.py


示例20: test_query_request_association

    def test_query_request_association(self):
        query = DotDict()
        query.association = 'resource_id'

        self.discovery.query_association = Mock()
        self.discovery.query_association.return_value = 'test'

        retval = self.discovery.query_request(query)
        self.assertTrue(retval == 'test')
开发者ID:blazetopher,项目名称:coi-services,代码行数:9,代码来源:discovery_test.py



注:本文中的pyon.util.containers.DotDict类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python file_sys.FileSystem类代码示例发布时间:2022-05-27
下一篇:
Python containers.named_any函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap