• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python arg_check.validate_is_not_none函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyon.util.arg_check.validate_is_not_none函数的典型用法代码示例。如果您正苦于以下问题:Python validate_is_not_none函数的具体用法?Python validate_is_not_none怎么用?Python validate_is_not_none使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了validate_is_not_none函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: add_data_product_version_to_collection

    def add_data_product_version_to_collection(self, data_product_id='', data_product_collection_id='', version_name='', version_description=''):


        dp_collection_obj =self.clients.resource_registry.read(data_product_collection_id)

        #retrieve the stream definition for both the new data product to add to this collection and the base data product for this collection
        new_data_product_obj = self.clients.resource_registry.read(data_product_id)
        new_data_product_streams, _ = self.clients.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, object_type=RT.Stream, id_only=True)
        validate_is_not_none(new_data_product_streams, 'The data product to add to the collection must have an associated stream')
        new_data_product_streamdefs, _ = self.clients.resource_registry.find_objects(subject=new_data_product_streams[0], predicate=PRED.hasStreamDefinition, object_type=RT.StreamDefinition, id_only=True)

        base_data_product_id = dp_collection_obj.version_list[0].data_product_id
        base_data_product_obj = self.clients.resource_registry.read(base_data_product_id)
        base_data_product_streams, _ = self.clients.resource_registry.find_objects(subject=base_data_product_id, predicate=PRED.hasStream, object_type=RT.Stream, id_only=True)
        validate_is_not_none(base_data_product_streams, 'The base data product in the collection must have an associated stream')
        base_data_product_streamdefs, _ = self.clients.resource_registry.find_objects(subject=base_data_product_streams[0], predicate=PRED.hasStreamDefinition, object_type=RT.StreamDefinition, id_only=True)
        if not self.clients.pubsub_management.compare_stream_definition(stream_definition1_id=new_data_product_streamdefs[0], stream_definition2_id=base_data_product_streamdefs[0]):
            raise BadRequest("All Data Products in a collection must have equivelent stream definitions.")

        #todo: validate that the spatial/temporal domain match the base data product


        dpv = DataProductVersion()
        dpv.name = version_name
        dpv.description = version_description
        dpv.data_product_id = data_product_id

        dp_collection_obj.version_list.append(dpv)
        self.clients.resource_registry.update(dp_collection_obj)

        self.clients.resource_registry.create_association( subject=data_product_collection_id, predicate=PRED.hasVersion, object=data_product_id)

        return
开发者ID:blazetopher,项目名称:coi-services,代码行数:33,代码来源:data_product_management_service.py


示例2: _launch_process

    def _launch_process(self, queue_name='', out_streams=None, process_definition_id='', configuration=None):
        """
        Launches the process
        """

        # ------------------------------------------------------------------------------------
        # Spawn Configuration and Parameters
        # ------------------------------------------------------------------------------------

        configuration['process'] = {
            'queue_name':queue_name,
            'publish_streams' : out_streams
        }

        # ------------------------------------------------------------------------------------
        # Process Spawning
        # ------------------------------------------------------------------------------------
        # Spawn the process
        pid = self.clients.process_dispatcher.schedule_process(
            process_definition_id=process_definition_id,
            configuration=configuration
        )
        validate_is_not_none( pid, "Process could not be spawned")

        return pid
开发者ID:tomoreilly,项目名称:coi-services,代码行数:25,代码来源:data_process_management_service.py


示例3: create_dataset

    def create_dataset(self, name='', datastore_name='', view_name='', stream_id='', parameter_dict=None, spatial_domain=None, temporal_domain=None, parameter_dictionary_id='', description=''):
        validate_true(parameter_dict or parameter_dictionary_id, 'A parameter dictionary must be supplied to register a new dataset.')
        validate_is_not_none(spatial_domain, 'A spatial domain must be supplied to register a new dataset.')
        validate_is_not_none(temporal_domain, 'A temporal domain must be supplied to register a new dataset.')
        
        if parameter_dictionary_id:
            pd = self.read_parameter_dictionary(parameter_dictionary_id)
            pcs = self.read_parameter_contexts(parameter_dictionary_id, id_only=False)
            parameter_dict = self._merge_contexts([ParameterContext.load(i.parameter_context) for i in pcs], pd.temporal_context)
            parameter_dict = parameter_dict.dump()

        dataset                      = Dataset()
        dataset.description          = description
        dataset.name                 = name
        dataset.primary_view_key     = stream_id or None
        dataset.datastore_name       = datastore_name or self.DEFAULT_DATASTORE
        dataset.view_name            = view_name or self.DEFAULT_VIEW
        dataset.parameter_dictionary = parameter_dict
        dataset.temporal_domain      = temporal_domain
        dataset.spatial_domain       = spatial_domain
        dataset.registered           = False


        dataset_id, _ = self.clients.resource_registry.create(dataset)
        if stream_id:
            self.add_stream(dataset_id,stream_id)

        log.debug('creating dataset: %s', dataset_id)

        cov = self._create_coverage(dataset_id, description or dataset_id, parameter_dict, spatial_domain, temporal_domain) 
        self._save_coverage(cov)
        cov.close()

        return dataset_id
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:34,代码来源:dataset_management_service.py


示例4: suspend_data_product_persistence

    def suspend_data_product_persistence(self, data_product_id=''):
        """Suspend data product data persistence into a data set, multiple options

        @param data_product_id    str
        @param type    str
        @throws NotFound    object with specified id does not exist
        """

        #--------------------------------------------------------------------------------
        # retrieve the data_process object
        #--------------------------------------------------------------------------------
        data_product_obj = self.clients.resource_registry.read(data_product_id)

        validate_is_not_none(data_product_obj, 'Should not have been empty')
        validate_is_instance(data_product_obj, DataProduct)

        if data_product_obj.dataset_configuration_id is None:
            raise NotFound("Data Product %s dataset configuration does not exist" % data_product_id)

        #--------------------------------------------------------------------------------
        # get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
        #streams = self.data_product.find_stemming_stream(data_product_id)
        #--------------------------------------------------------------------------------
        stream_id = self._get_stream_id(data_product_id)
        validate_is_not_none(stream_id, 'Data Product %s must have one stream associated' % str(data_product_id))

        ret = self.clients.ingestion_management.unpersist_data_stream(stream_id=stream_id, ingestion_configuration_id=data_product_obj.dataset_configuration_id)
开发者ID:blazetopher,项目名称:coi-services,代码行数:27,代码来源:data_product_management_service.py


示例5: assign_stream_definition_to_data_process_definition

    def assign_stream_definition_to_data_process_definition(
        self, stream_definition_id="", data_process_definition_id="", binding=""
    ):
        """Connect the output  stream with a data process definition
        """
        # Verify that both ids are valid, RR will throw if not found
        stream_definition_obj = self.clients.resource_registry.read(stream_definition_id)
        data_process_definition_obj = self.clients.resource_registry.read(data_process_definition_id)

        validate_is_not_none(
            stream_definition_obj,
            "No stream definition object found for stream definition id: %s" % stream_definition_id,
        )
        validate_is_not_none(
            data_process_definition_obj,
            "No data process definition object found for data process"
            " definition id: %s" % data_process_definition_id,
        )

        self.clients.resource_registry.create_association(
            data_process_definition_id, PRED.hasStreamDefinition, stream_definition_id
        )
        if binding:
            data_process_definition_obj.output_bindings[binding] = stream_definition_id
        self.clients.resource_registry.update(data_process_definition_obj)
开发者ID:oldpatricka,项目名称:coi-services,代码行数:25,代码来源:data_process_management_service.py


示例6: _launch_process

    def _launch_process(self, queue_name='', out_streams=None, process_definition_id='', configuration=None):
        """
        Launches the process
        """

        # ------------------------------------------------------------------------------------
        # Spawn Configuration and Parameters
        # ------------------------------------------------------------------------------------

        if 'process' not in configuration:
            configuration['process'] = {}
        configuration['process']['queue_name'] = queue_name
        configuration['process']['publish_streams'] = out_streams

        # Setting the restart mode
        schedule = ProcessSchedule()
        schedule.restart_mode = ProcessRestartMode.ABNORMAL

        # ------------------------------------------------------------------------------------
        # Process Spawning
        # ------------------------------------------------------------------------------------
        # Spawn the process
        pid = self.clients.process_dispatcher.schedule_process(
            process_definition_id=process_definition_id,
            schedule= schedule,
            configuration=configuration
        )
        validate_is_not_none( pid, "Process could not be spawned")

        return pid
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:30,代码来源:data_process_management_service.py


示例7: on_start

 def on_start(self):
     SimpleProcess.on_start(self)
     self.data_retriever = DataRetrieverServiceProcessClient(process=self)
     self.interval_key = self.CFG.get_safe('process.interval_key',None)
     self.qc_params    = self.CFG.get_safe('process.qc_params',[])
     validate_is_not_none(self.interval_key, 'An interval key is necessary to paunch this process')
     self.event_subscriber = EventSubscriber(event_type=OT.TimerEvent, origin=self.interval_key, callback=self._event_callback, auto_delete=True)
     self.add_endpoint(self.event_subscriber)
     self.resource_registry = self.container.resource_registry
开发者ID:shenrie,项目名称:coi-services,代码行数:9,代码来源:qc_post_processing.py


示例8: read_data_product_collection

    def read_data_product_collection(self, data_product_collection_id=''):
        """Retrieve data product information

        @param data_product_collection_id    str
        @retval data_product    DataProductVersion
        """
        result = self.clients.resource_registry.read(data_product_collection_id)

        validate_is_not_none(result, "Should not have returned an empty result")

        return result
开发者ID:blazetopher,项目名称:coi-services,代码行数:11,代码来源:data_product_management_service.py


示例9: activate_data_product_persistence

    def activate_data_product_persistence(self, data_product_id=''):
        """Persist data product data into a data set

        @param data_product_id    str
        @throws NotFound    object with specified id does not exist
        """
        #--------------------------------------------------------------------------------
        # retrieve the data_process object
        #--------------------------------------------------------------------------------
        data_product_obj = self.data_product.read_one(data_product_id)

        validate_is_not_none(data_product_obj, "The data product id should correspond to a valid registered data product.")

        #--------------------------------------------------------------------------------
        # get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
        #--------------------------------------------------------------------------------
        streams = self.data_product.find_stemming_stream(data_product_id)
        if not streams:
            raise BadRequest('Data Product %s must have one stream associated' % str(data_product_id))

        stream_id = streams[0]._id
        log.debug("Activating data product persistence for stream_id: %s"  % str(stream_id))



        #-----------------------------------------------------------------------------------------
        # grab the ingestion configuration id from the data_product in order to use to persist it
        #-----------------------------------------------------------------------------------------
        if data_product_obj.dataset_configuration_id:
            ingestion_configuration_id = data_product_obj.dataset_configuration_id
        else:
            ingestion_configuration_id = self.clients.ingestion_management.list_ingestion_configurations(id_only=True)[0]

        #--------------------------------------------------------------------------------
        # persist the data stream using the ingestion config id and stream id
        #--------------------------------------------------------------------------------

        # find datasets for the data product
        dataset_id = self._get_dataset_id(data_product_id)
        log.debug("Activating data product persistence for dataset_id: %s"  % str(dataset_id))
        dataset_id = self.clients.ingestion_management.persist_data_stream(stream_id=stream_id,
                                                ingestion_configuration_id=ingestion_configuration_id,
                                                dataset_id=dataset_id)

        # register the dataset for externalization
        self.clients.dataset_management.register_dataset(dataset_id)


        #--------------------------------------------------------------------------------
        # todo: dataset_configuration_obj contains the ingest config for now...
        # Update the data product object
        #--------------------------------------------------------------------------------
        data_product_obj.dataset_configuration_id = ingestion_configuration_id
        self.update_data_product(data_product_obj)
开发者ID:tomoreilly,项目名称:coi-services,代码行数:54,代码来源:data_product_management_service.py


示例10: assign_input_stream_definition_to_data_process_definition

    def assign_input_stream_definition_to_data_process_definition(self, stream_definition_id='', data_process_definition_id=''):
        """Connect the input  stream with a data process definition
        """
        # Verify that both ids are valid, RR will throw if not found
        stream_definition_obj = self.clients.resource_registry.read(stream_definition_id)
        data_process_definition_obj = self.clients.resource_registry.read(data_process_definition_id)

        validate_is_not_none(stream_definition_obj, "No stream definition object found for stream definition id: %s" % stream_definition_id)
        validate_is_not_none(data_process_definition_obj, "No data process definition object found for data process" \
                                                          " definition id: %s" % data_process_definition_id)

        self.clients.resource_registry.create_association(data_process_definition_id,  PRED.hasInputStreamDefinition,  stream_definition_id)
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:12,代码来源:data_process_management_service.py


示例11: read_data_product_version

    def read_data_product_version(self, data_product_version_id=''):
        """Retrieve data product information

        @param data_product_version_id    str
        @retval data_product    DataProductVersion
        """
        log.debug("DataProductManagementService:read_data_product_version: %s" % str(data_product_version_id))

        result = self.clients.resource_registry.read(data_product_version_id)

        validate_is_not_none(result, "Should not have returned an empty result")

        return result
开发者ID:pombredanne,项目名称:coi-services,代码行数:13,代码来源:data_product_management_service.py


示例12: update_data_product_collection

    def update_data_product_collection(self, data_product_collection=None):
        """@todo document this interface!!!

        @param data_product    DataProductVersion
        @throws NotFound    object with specified id does not exist
        """

        validate_is_not_none(data_product_collection, "Should not pass in a None object")

        self.clients.resource_registry.update(data_product_collection)

        #TODO: any changes to producer? Call DataAcquisitionMgmtSvc?

        return
开发者ID:blazetopher,项目名称:coi-services,代码行数:14,代码来源:data_product_management_service.py


示例13: get_data_product_provenance

    def get_data_product_provenance(self, data_product_id=''):

        # Retrieve information that characterizes how this data was produced
        # Return in a dictionary

        self.provenance_results = {}

        data_product = self.data_product.read_one(data_product_id)
        validate_is_not_none(data_product, "Should have got a non empty data product")

        # todo: get the start time of this data product
        self.data_product._find_producers(data_product_id, self.provenance_results)

        return self.provenance_results
开发者ID:blazetopher,项目名称:coi-services,代码行数:14,代码来源:data_product_management_service.py


示例14: unassign_stream_definition_from_data_process_definition

    def unassign_stream_definition_from_data_process_definition(self, stream_definition_id='', data_process_definition_id=''):
        """
        Disconnect the Data Product from the Data Producer

        @param stream_definition_id    str
        @param data_process_definition_id    str
        @throws NotFound    object with specified id does not exist
        """

        # Remove the link between the Stream Definition resource and the Data Process Definition resource
        associations = self.clients.resource_registry.find_associations(data_process_definition_id, PRED.hasStreamDefinition, stream_definition_id, id_only=True)

        validate_is_not_none(associations, "No Stream Definitions associated with data process definition ID " + str(data_process_definition_id))
        for association in associations:
            self.clients.resource_registry.delete_association(association)
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:15,代码来源:data_process_management_service.py


示例15: create_data_product_collection

    def create_data_product_collection(self, data_product_id='', collection_name='', collection_description=''):
        """Define a  set of an existing data products that represent an improvement in the quality or
        understanding of the information.
        """
        validate_is_not_none(data_product_id, 'A data product identifier must be passed to create a data product version')

        dpv = DataProductVersion()
        dpv.name = 'base'
        dpv.description = 'the base version on which subsequent versions are built'
        dpv.data_product_id = data_product_id

        dp_collection_obj = IonObject(RT.DataProductCollection, name=collection_name, description=collection_description, version_list=[dpv])

        data_product_collection_id, rev = self.clients.resource_registry.create(dp_collection_obj)
        self.clients.resource_registry.create_association( subject=data_product_collection_id, predicate=PRED.hasVersion, object=data_product_id)

        return data_product_collection_id
开发者ID:blazetopher,项目名称:coi-services,代码行数:17,代码来源:data_product_management_service.py


示例16: test_validations

    def test_validations(self):
        import pyon.util.arg_check as arg_check

        with self.assertRaises(BadRequest):
            arg_check.validate_true(False,'test')

        with self.assertRaises(BadRequest):
            arg_check.validate_equal(3,4,'test')

        with self.assertRaises(BadRequest):
            arg_check.validate_not_equal(4,4,'test')

        with self.assertRaises(BadRequest):
            arg_check.validate_false(True,'test')

        with self.assertRaises(BadRequest):
            one = list()
            two = list()
            arg_check.validate_is(one,two,'test')

        with self.assertRaises(BadRequest):
            one = list()
            two = one
            arg_check.validate_is_not(one,two,'test')

        with self.assertRaises(BadRequest):
            c = None
            arg_check.validate_is_not_none(c,'test')

        with self.assertRaises(BadRequest):
            one = list([1,3])
            two = 2
            arg_check.validate_in(two,one,'test')

        with self.assertRaises(BadRequest):
            one = list([1,2,3])
            two = 2
            arg_check.validate_not_in(two,one,'test')

        with self.assertRaises(BadRequest):
            one = list()
            arg_check.validate_is_instance(one,dict,'test')

        with self.assertRaises(BadRequest):
            one = list()
            arg_check.validate_not_is_instance(one,list,'test')
开发者ID:ateranishi,项目名称:pyon,代码行数:46,代码来源:test_arg_check.py


示例17: create_data_product_version

    def create_data_product_version(self, data_product_id='', data_product_version=None):
        """Define a new version of an existing set of information that represent an inprovement in the quality or
        understanding of the information. Only creates the second and higher versions of a DataProduct.
        The first version is implicit in the crate_data_product() operation.

        @param data_product_id    str
        @param data_product_version    DataProductVersion
        @retval data_product_version_id    str
        @throws BadRequest    if object does not have _id or _rev attribute
        @throws NotFound    object with specified id does not exist
        """
        validate_is_not_none(data_product_id, 'A data product identifier must be passed to create a data product version')
        validate_is_not_none(data_product_version, 'A data product version (ion object) must be passed to create a data product version')

        data_product_version_id, rev = self.clients.resource_registry.create(data_product_version)
        self.clients.resource_registry.create_association( subject=data_product_id, predicate=PRED.hasVersion, object=data_product_version_id)


        #-----------------------------------------------------------------------------------------------
        #Create the stream and a dataset for the new version
        #-----------------------------------------------------------------------------------------------
        stream_id = self.clients.pubsub_management.create_stream(name=data_product_version.name,
                                                                description=data_product_version.description)

        # Associate the Stream with the main Data Product and with the default data product version
        self.clients.resource_registry.create_association( subject=data_product_version_id, predicate=PRED.hasStream, object=stream_id)

        #get the parameter_dictionary assoc with the original dataset
        dataset_ids, _ = self.clients.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasDataset, object_type=RT.DataSet, id_only=True)
        if not dataset_ids:
            raise BadRequest('No Dataset associated with the DataProduct %s' % str(data_product_id))

        log.debug("DataProductManagementService:create_data_product_version base_dataset_id: %s", str(dataset_ids[0]))
        base_dataset_obj = self.clients.dataset_management.read_dataset(str(dataset_ids[0]))
        log.debug("DataProductManagementService:create_data_product_version base_dataset_obj: %s" % str(base_dataset_obj))

        # create a dataset for this version. must have same parameter dictionary and spatial/temporal domain as original data product.
        data_set_id = self.clients.dataset_management.create_dataset(   name= 'data_set_%s' % stream_id,
                                                                        stream_id=stream_id,
                                                                        parameter_dict=base_dataset_obj.parameter_dictionary,
                                                                        temporal_domain=base_dataset_obj.temporal_domain,
                                                                        spatial_domain=base_dataset_obj.spatial_domain)
        self.clients.resource_registry.create_association(subject=data_product_version_id, predicate=PRED.hasDataset, object=data_set_id)

        return data_product_version_id
开发者ID:pombredanne,项目名称:coi-services,代码行数:45,代码来源:data_product_management_service.py


示例18: suspend_data_product_persistence

    def suspend_data_product_persistence(self, data_product_id=''):
        """Suspend data product data persistence into a data set, multiple options

        @param data_product_id    str
        @param type    str
        @throws NotFound    object with specified id does not exist
        """

        log.debug("suspend_data_product_persistence: data_product_id = %s"  % str(data_product_id))

        #--------------------------------------------------------------------------------
        # retrieve the data_process object
        #--------------------------------------------------------------------------------
        data_product_obj = self.clients.resource_registry.read(data_product_id)

        validate_is_not_none(data_product_obj, 'Should not have been empty')
        validate_is_instance(data_product_obj, DataProduct)

        if data_product_obj.dataset_configuration_id is None:
            raise NotFound("Data Product %s dataset configuration does not exist" % data_product_id)

        log.debug("Data product: %s" % data_product_obj)

        #--------------------------------------------------------------------------------
        # get the Stream associated with this data product; if no stream then create one, if multiple streams then Throw
        #streams = self.data_product.find_stemming_stream(data_product_id)
        #--------------------------------------------------------------------------------
        stream_ids, _ = self.clients.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasStream, object_type=RT.Stream, id_only=True)
        if not stream_ids:
            raise BadRequest('Data Product %s must have one stream associated' % str(data_product_id))

        for stream_id in stream_ids:
            log.debug("suspend_data_product_persistence: stream = %s"  % str(stream_id))
            log.debug("data_product_obj.dataset_configuration_id: %s" % data_product_obj.dataset_configuration_id)

            ret = self.clients.ingestion_management.unpersist_data_stream(stream_id=stream_id, ingestion_configuration_id=data_product_obj.dataset_configuration_id)
            log.debug("suspend_data_product_persistence: deactivate = %s"  % str(ret))

        #--------------------------------------------------------------------------------
        # detach the dataset from this data product
        #--------------------------------------------------------------------------------
        dataset_ids, _ = self.clients.resource_registry.find_objects(subject=data_product_id, predicate=PRED.hasDataset, id_only=True)
        for dataset_id in dataset_ids:
            self.data_product.unlink_data_set(data_product_id, dataset_id)
开发者ID:pombredanne,项目名称:coi-services,代码行数:44,代码来源:data_product_management_service.py


示例19: _get_input_stream_ids

    def _get_input_stream_ids(self, in_data_product_ids = None):

        input_stream_ids = []

        #------------------------------------------------------------------------------------------------------------------------------------------
        # get the streams associated with this IN data products
        #------------------------------------------------------------------------------------------------------------------------------------------
        for  in_data_product_id in in_data_product_ids:

            # Get the stream associated with this input data product
            stream_ids, _ = self.clients.resource_registry.find_objects(in_data_product_id, PRED.hasStream, RT.Stream, True)

            validate_is_not_none( stream_ids, "No Stream created for this input Data Product " + str(in_data_product_id))
            validate_is_not_none( len(stream_ids) != 1, "Input Data Product should only have ONE stream" + str(in_data_product_id))

            # We take for now one stream_id associated with each input data product
            input_stream_ids.append(stream_ids[0])

        return input_stream_ids
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:19,代码来源:data_process_management_service.py


示例20: update_event_process_definition

    def update_event_process_definition(self, event_process_definition_id='', version='', module='', class_name='', uri='', arguments=None, event_types=None, sub_types=None, origin_types=None):
        """
        Update the process definition for the event process.

        @param event_process_definition_id str
        @param version str
        @param module str
        @param class_name str
        @param uri str
        @arguments list
        """

        validate_is_not_none(event_process_definition_id)

        # The event_process_def is really only a process_def. Read up the process definition
        process_def = self.clients.resource_registry.read(event_process_definition_id)

        definition = process_def.definition

        # Fetch or make a new EventProcessDefinitionDetail object
        if definition:
            event_process_def_detail = EventProcessDefinitionDetail()
            event_process_def_detail.event_types = event_types or definition.event_types
            event_process_def_detail.sub_types = sub_types or definition.sub_types
            event_process_def_detail.origin_types = origin_types or definition.origin_types
        else:
            event_process_def_detail = EventProcessDefinitionDetail(event_types = event_types, sub_types = sub_types, origin_types = origin_types)

#        event_process_def_detail = process_def.definition or EventProcessDefinitionDetail()



        # Update the fields of the process definition
        process_def.executable['module'] = module
        process_def.executable['class'] = class_name
        process_def.executable['uri'] = uri
        process_def.version = version
        process_def.arguments = arguments
        process_def.definition = event_process_def_detail

        # Finally update the resource registry
        self.clients.resource_registry.update(process_def)
开发者ID:Bobfrat,项目名称:coi-services,代码行数:42,代码来源:event_management_service.py



注:本文中的pyon.util.arg_check.validate_is_not_none函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python arg_check.validate_true函数代码示例发布时间:2022-05-27
下一篇:
Python arg_check.validate_is_instance函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap