本文整理汇总了Python中pyon.ion.resource.ExtendedResourceContainer类的典型用法代码示例。如果您正苦于以下问题:Python ExtendedResourceContainer类的具体用法?Python ExtendedResourceContainer怎么用?Python ExtendedResourceContainer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了ExtendedResourceContainer类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_actor_identity_extension
def get_actor_identity_extension(self, user_id="", org_id="", ext_associations=None, ext_exclude=None):
"""Returns an ActorIdentityExtension object containing additional related information
@param user_id str
@param org_id str
@param ext_associations dict
@param ext_exclude list
@retval actor_identity ActorIdentityExtension
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified user_id does not exist
"""
if not user_id:
raise BadRequest("The user_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_user = extended_resource_handler.create_extended_resource_container(
OT.ActorIdentityExtension, user_id, None, ext_associations, ext_exclude
)
# If the org_id is not provided then skip looking for Org related roles.
if org_id:
# Did not setup a dependency to org_management service to avoid a potential circular bootstrap issue
# since this method should never be called until the system is fully running
try:
org_client = OrgManagementServiceProcessClient(process=self)
roles = org_client.find_org_roles_by_user(org_id, user_id)
extended_user.roles = roles
except Exception, e:
# If this information is not available yet, them just move on and caller can retry later
pass
开发者ID:ooici-eoi,项目名称:coi-services,代码行数:32,代码来源:identity_management_service.py
示例2: get_user_info_extension
def get_user_info_extension(self, user_info_id='', org_id=''):
"""Returns an UserInfoExtension object containing additional related information
@param user_info_id str
@param org_id str - An optional org id that the user is interested in filtering against.
@retval user_info UserInfoExtension
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified actor_id does not exist
"""
if not user_info_id:
raise BadRequest("The user_info_id parameter is empty")
#THis is a hack to get the UI going. It would be preferable to get the actor id from the extended resource
#container below, but their would need to be a guarantee of order of field processing in order
#to ensure that the actor identity has been found BEFORE the negotiation methods are called - and probably
#some elegant way to indicate the field and sub field; ie actor_identity._id
actors, _ = self.clients.resource_registry.find_subjects(subject_type=RT.ActorIdentity, predicate=PRED.hasInfo, object=user_info_id, id_only=True)
actor_id = actors[0] if len(actors) > 0 else ''
extended_resource_handler = ExtendedResourceContainer(self)
extended_user = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.UserInfoExtension,
resource_id=user_info_id,
computed_resource_type=OT.ComputedAttributes,
user_id=user_info_id,
org_id=org_id,
actor_id=actor_id)
#If the org_id is not provided then skip looking for Org related roles.
if extended_user:
#Did not setup a dependency to org_management service to avoid a potential circular bootstrap issue
# since this method should never be called until the system is fully running
try:
org_client = OrgManagementServiceProcessClient(process=self)
roles = org_client.find_all_roles_by_user(extended_user.actor_identity._id)
extended_user.roles = list()
for org_name in roles:
for role in roles[org_name]:
flattened_role = copy.copy(role.__dict__)
del flattened_role['type_'] #Have to do this to appease the message validators for ION objects
flattened_role['org_name'] = org_name #Nothing like forcing a value into the dict to appease the UI code
extended_user.roles.append(flattened_role)
except Exception, e:
raise NotFound('Could not retrieve UserRoles for User Info id: %s - %s' % (user_info_id, e.message))
#filter notification requests that are retired
extended_user.subscriptions = [nr for nr in extended_user.subscriptions if nr.temporal_bounds.end_datetime == '']
#filter owned resources that are retired
nr_removed = []
for rsrc in extended_user.owned_resources:
#remove all the Notifications
if rsrc.type_ != OT.NotificationRequest:
nr_removed.append(rsrc)
extended_user.owned_resources = [rsrc for rsrc in nr_removed if rsrc.lcstate != 'DELETED']
#now append the active NotificationRequests
extended_user.owned_resources.extend(extended_user.subscriptions)
开发者ID:ednad,项目名称:coi-services,代码行数:60,代码来源:identity_management_service.py
示例3: get_data_process_definition_extension
def get_data_process_definition_extension(self, data_process_definition_id='', ext_associations=None, ext_exclude=None):
#Returns an DataProcessDefinition Extension object containing additional related information
if not data_process_definition_id:
raise BadRequest("The data_process_definition_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_data_process_definition = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DataProcessDefinitionExtension,
resource_id=data_process_definition_id,
computed_resource_type=OT.DataProcessDefinitionComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude)
#Loop through any attachments and remove the actual content since we don't need
# to send it to the front end this way
#TODO - see if there is a better way to do this in the extended resource frame work.
if hasattr(extended_data_process_definition, 'attachments'):
for att in extended_data_process_definition.attachments:
if hasattr(att, 'content'):
delattr(att, 'content')
# replace list of lists with single list
replacement_data_products = []
for inner_list in extended_data_process_definition.data_products:
if inner_list:
for actual_data_product in inner_list:
if actual_data_product:
replacement_data_products.append(actual_data_product)
extended_data_process_definition.data_products = replacement_data_products
return extended_data_process_definition
开发者ID:kerfoot,项目名称:coi-services,代码行数:33,代码来源:data_process_management_service.py
示例4: get_user_info_extension
def get_user_info_extension(self, user_info_id=''):
"""Returns an UserInfoExtension object containing additional related information
@param user_info_id str
@retval user_info UserInfoExtension
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified user_id does not exist
"""
if not user_info_id:
raise BadRequest("The user_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_user = extended_resource_handler.create_extended_resource_container(OT.UserInfoExtension, user_info_id)
#If the org_id is not provided then skip looking for Org related roles.
if extended_user:
#Did not setup a dependency to org_management service to avoid a potential circular bootstrap issue
# since this method should never be called until the system is fully running
try:
org_client = OrgManagementServiceProcessClient(process=self)
roles = org_client.find_all_roles_by_user(extended_user.actor_identity._id)
extended_user.roles = list()
for org_name in roles:
for role in roles[org_name]:
flattened_role = copy.copy(role.__dict__)
del flattened_role['type_'] #Have to do this to appease the message validators for ION objects
flattened_role['org_name'] = org_name #Nothing like forcing a value into the dict to appease the UI code
extended_user.roles.append(flattened_role)
except Exception, e:
raise NotFound('Could not retrieve UserRoles for User Info id: %s - %s' % (user_info_id, e.message))
开发者ID:tomoreilly,项目名称:coi-services,代码行数:31,代码来源:identity_management_service.py
示例5: get_observatory_extension
def get_observatory_extension(self, observatory_id='', ext_associations=None, ext_exclude=None):
"""Returns an InstrumentDeviceExtension object containing additional related information
@param observatory_id str
@param ext_associations dict
@param ext_exclude list
@retval observatory ObservatoryExtension
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified observatory_id does not exist
"""
if not observatory_id:
raise BadRequest("The observatory_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_instrument = extended_resource_handler.create_extended_resource_container(
OT.ObservatoryExtension,
observatory_id,
OT.ObservatoryComputedAttributes,
ext_associations,
ext_exclude)
#Loop through any attachments and remove the actual content since we don't need to send it to the front end this way
#TODO - see if there is a better way to do this in the extended resource frame work.
if hasattr(extended_instrument, 'attachments'):
for att in extended_instrument.attachments:
if hasattr(att, 'content'):
delattr(att, 'content')
return extended_instrument
开发者ID:pombredanne,项目名称:coi-services,代码行数:31,代码来源:observatory_management_service.py
示例6: get_resource_extension
def get_resource_extension(self, resource_id='', resource_extension='', ext_associations=None, ext_exclude=None):
"""Returns any ExtendedResource object containing additional related information derived from associations
@param resource_id str
@param resource_extension str
@param ext_associations dict
@param ext_exclude list
@retval actor_identity ExtendedResource
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified resource_id does not exist
"""
if not resource_id:
raise BadRequest("The resource_id parameter is empty")
if not resource_extension:
raise BadRequest("The extended_resource parameter not set")
extended_resource_handler = ExtendedResourceContainer(self, self)
#Handle differently if the resource_id parameter is a list of ids
if resource_id.find('[') > -1:
res_input = eval(resource_id)
extended_resource_list = extended_resource_handler.create_extended_resource_container_list(resource_extension,
res_input, computed_resource_type=None, origin_resource_type=None, ext_associations=ext_associations, ext_exclude=ext_exclude)
return extended_resource_list
extended_resource = extended_resource_handler.create_extended_resource_container(resource_extension,
resource_id, computed_resource_type=None, ext_associations=ext_associations, ext_exclude=ext_exclude)
return extended_resource
开发者ID:newbrough,项目名称:pyon,代码行数:30,代码来源:resregistry.py
示例7: get_data_product_extension
def get_data_product_extension(self, data_product_id='', ext_associations=None, ext_exclude=None):
#Returns an DataProductExtension object containing additional related information
if not data_product_id:
raise BadRequest("The data_product_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_product = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DataProductExtension,
resource_id=data_product_id,
computed_resource_type=OT.DataProductComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude)
#Loop through any attachments and remove the actual content since we don't need
# to send it to the front end this way
#TODO - see if there is a better way to do this in the extended resource frame work.
if hasattr(extended_product, 'attachments'):
for att in extended_product.attachments:
if hasattr(att, 'content'):
delattr(att, 'content')
#extract the list of upstream data products from the provenance results
dp_list = []
for key, value in extended_product.computed.provenance.value.iteritems():
for producer_id, dataprodlist in value['inputs'].iteritems():
for dataprod in dataprodlist:
dp_list.append( self.clients.resource_registry.read(dataprod) )
extended_product.provenance_product_list = set(dp_list) #remove dups in list
return extended_product
开发者ID:tomoreilly,项目名称:coi-services,代码行数:33,代码来源:data_product_management_service.py
示例8: get_data_process_extension
def get_data_process_extension(self, data_process_id='', ext_associations=None, ext_exclude=None, user_id=''):
#Returns an DataProcessDefinition Extension object containing additional related information
if not data_process_id:
raise BadRequest("The data_process_definition_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_data_process = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DataProcessExtension,
resource_id=data_process_id,
computed_resource_type=OT.DataProcessComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude,
user_id=user_id)
#Loop through any attachments and remove the actual content since we don't need
# to send it to the front end this way
#TODO - see if there is a better way to do this in the extended resource frame work.
if hasattr(extended_data_process, 'attachments'):
for att in extended_data_process.attachments:
if hasattr(att, 'content'):
delattr(att, 'content')
return extended_data_process
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:25,代码来源:data_process_management_service.py
示例9: get_marine_facility_extension
def get_marine_facility_extension(self, org_id='', ext_associations=None, ext_exclude=None):
"""Returns an MarineFacilityOrgExtension object containing additional related information
@param org_id str
@param ext_associations dict
@param ext_exclude list
@retval observatory ObservatoryExtension
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified observatory_id does not exist
"""
if not org_id:
raise BadRequest("The org_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_org = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.MarineFacilityOrgExtension,
resource_id=org_id,
computed_resource_type=OT.MarineFacilityOrgComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude)
#compute the non deployed devices
if hasattr(extended_org, 'instruments') and hasattr(extended_org, 'instruments_deployed') and hasattr(extended_org, 'instruments_not_deployed'):
#clean up the list of deployed instrument
dply_inst = []
for instrument_deployed in extended_org.instruments_deployed:
# a compound assoc returns a list of lists but only one hasDevice assoc is permitted between a site and a device so get the only element from inside this list
instrument_deployed = instrument_deployed[0]
if hasattr(instrument_deployed, 'type_') and instrument_deployed.type_ == 'InstrumentDevice':
dply_inst.append(instrument_deployed)
extended_org.instruments_deployed = dply_inst
#compute the list of non-deployed instruments
for org_instrument in extended_org.instruments:
if not org_instrument in extended_org.instruments_deployed:
extended_org.instruments_not_deployed.append(org_instrument)
if hasattr(extended_org, 'platforms') and hasattr(extended_org, 'platforms_deployed') and hasattr(extended_org, 'platforms_not_deployed'):
#clean up the list of deployed platforms
dply_pltfrms = []
for platform_deployed in extended_org.platforms_deployed:
# a compound assoc returns a list of lists but only one hasDevice assoc is permitted between a site and a device so get the only element from inside this list
platform_deployed = platform_deployed[0]
if hasattr(platform_deployed, 'type_') and platform_deployed.type_ == 'PlatformDevice':
dply_pltfrms.append(platform_deployed)
extended_org.platforms_deployed = dply_pltfrms
#compute the list of non-deployed platforms
for org_platform in extended_org.platforms:
if not extended_org.platforms_deployed.count(org_platform):
extended_org.platforms_not_deployed.append(org_platform)
return extended_org
开发者ID:tomoreilly,项目名称:coi-services,代码行数:57,代码来源:org_management_service.py
示例10: get_deployment_extension
def get_deployment_extension(self, deployment_id='', ext_associations=None, ext_exclude=None, user_id=''):
if not deployment_id:
raise BadRequest("The deployment_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_deployment = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DeploymentExtension,
resource_id=deployment_id,
computed_resource_type=OT.DeploymentComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude,
user_id=user_id)
devices = set()
instrument_device_ids = []
iplatform_device_ids = []
subjs, _ = self.RR.find_subjects( predicate=PRED.hasDeployment, object=deployment_id, id_only=False)
for subj in subjs:
log.debug('get_deployment_extension obj: %s', subj)
if subj.type_ == "InstrumentDevice":
extended_deployment.instrument_devices.append(subj)
devices.add((subj._id, PRED.hasModel))
elif subj.type_ == "InstrumentSite":
extended_deployment.instrument_sites.append(subj)
elif subj.type_ == "PlatformDevice":
extended_deployment.platform_devices.append(subj)
devices.add((subj._id, PRED.hasModel))
elif subj.type_ == "PlatformSite":
extended_deployment.platform_sites.append(subj)
else:
log.warning("get_deployment_extension found invalid type connected to deployment %s. Object details: %s ", deployment_id, subj)
all_models = set()
device_to_model_map = {}
model_map = {}
assocs = self.RR.find_associations(anyside=list(devices), id_only=False)
for assoc in assocs:
log.debug('get_deployment_extension assoc subj: %s pred: %s obj: %s', assoc.s, assoc.p, assoc.o)
all_models.add(assoc.o)
device_to_model_map[assoc.s] = assoc.o
model_objs = self.RR.read_mult( list(all_models) )
for model_obj in model_objs:
model_map[model_obj._id] = model_obj
for instrument in extended_deployment.instrument_devices:
model_id = device_to_model_map[instrument._id]
extended_deployment.instrument_models.append( model_map[model_id] )
for platform in extended_deployment.platform_devices:
model_id = device_to_model_map[platform._id]
extended_deployment.platform_models.append( model_map[model_id] )
return extended_deployment
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:56,代码来源:observatory_management_service.py
示例11: prepare_user_info_support
def prepare_user_info_support(self, user_info_id=''):
"""
Returns the object containing the data to create/update a user info resource
"""
# TODO: Precondition or check: only the actor identity associated with this user or an ORG_MANAGER/ION_MANAGER
# can edit this user info resource
extended_resource_handler = ExtendedResourceContainer(self)
resource_data = extended_resource_handler.create_prepare_resource_support(user_info_id, OT.UserInfoPrepareSupport)
return resource_data
开发者ID:ednad,项目名称:coi-services,代码行数:13,代码来源:identity_management_service.py
示例12: get_data_process_definition_extension
def get_data_process_definition_extension(
self, data_process_definition_id="", ext_associations=None, ext_exclude=None, user_id=""
):
# Returns an DataProcessDefinition Extension object containing additional related information
if not data_process_definition_id:
raise BadRequest("The data_process_definition_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_data_process_definition = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DataProcessDefinitionExtension,
resource_id=data_process_definition_id,
computed_resource_type=OT.DataProcessDefinitionComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude,
user_id=user_id,
)
# Loop through any attachments and remove the actual content since we don't need
# to send it to the front end this way
# TODO - see if there is a better way to do this in the extended resource frame work.
if hasattr(extended_data_process_definition, "attachments"):
for att in extended_data_process_definition.attachments:
if hasattr(att, "content"):
delattr(att, "content")
data_process_ids = set()
for dp in extended_data_process_definition.data_processes:
data_process_ids.add(dp._id)
# create the list of output data_products from the data_processes that is aligned
products_map = {}
objects, associations = self.clients.resource_registry.find_objects_mult(
subjects=list(data_process_ids), id_only=False
)
for obj, assoc in zip(objects, associations):
# if this is a hasOutputProduct association...
if assoc.p == PRED.hasOutputProduct:
# collect the set of output products from each data process in a map
if not products_map.has_key(assoc.s):
products_map[assoc.s] = [assoc.o]
else:
products_map[assoc.s].append(assoc.o)
# now set up the final list to align
extended_data_process_definition.data_products = []
for dp in extended_data_process_definition.data_processes:
extended_data_process_definition.data_products.append(products_map[dp._id])
return extended_data_process_definition
开发者ID:oldpatricka,项目名称:coi-services,代码行数:50,代码来源:data_process_management_service.py
示例13: prepare_resource_support
def prepare_resource_support(self, resource_type='', resource_id=''):
"""Returns a structured dict with information to help create/update a resource
@param resource_type str
@param resource_id str
@retval resource_data GenericPrepareSupport
@throws BadRequest A parameter is missing
@throws NotFound An object with the specified resource_id does not exist
"""
if not resource_type:
raise BadRequest("The resource_type parameter is required")
extended_resource_handler = ExtendedResourceContainer(self, self)
resource_data = extended_resource_handler.create_prepare_resource_support(resource_id=resource_id, prepare_resource_type=OT.GenericPrepareSupport, origin_resource_type=resource_type)
#Fill out service request information for creating a instrument device
extended_resource_handler.set_service_requests(resource_data.create_request, 'resource_registry',
'create', { "object": "$(object)" })
#Fill out service request information for creating a instrument device
extended_resource_handler.set_service_requests(resource_data.update_request, 'resource_registry',
'update', { "object": "$(object)" })
return resource_data
开发者ID:caseybryant,项目名称:pyon,代码行数:26,代码来源:resregistry.py
示例14: test_create_extended_resource_container
def test_create_extended_resource_container(self):
mock_clients = self._create_service_mock('resource_registry')
self.clients = mock_clients
self.container = Mock()
extended_resource_handler = ExtendedResourceContainer(self, mock_clients.resource_registry)
instrument_device = Mock()
instrument_device._id = '123'
instrument_device.name = "MyInstrument"
instrument_device.type_ = RT.InstrumentDevice
instrument_device.lcstate = LCS.DRAFT
instrument_device.availability = AS.PRIVATE
instrument_device2 = Mock()
instrument_device2._id = '456'
instrument_device2.name = "MyInstrument2"
instrument_device2.type_ = RT.InstrumentDevice
actor_identity = Mock()
actor_identity._id = '111'
actor_identity.name = "Foo"
actor_identity.type_ = RT.ActorIdentity
actor_identity = Mock()
actor_identity._id = '1112'
actor_identity.name = "Foo2"
actor_identity.type_ = RT.ActorIdentity
user_info = Mock()
user_info._id = '444'
user_info.name = "John Doe"
user_info.email = "[email protected]"
user_info.phone = "555-555-5555"
user_info.variables = [{"name": "subscribeToMailingList", "value": "False"}]
user_info2 = Mock()
user_info2._id = '445'
user_info2.name = "aka Evil Twin"
user_info2.email = "[email protected]"
user_info2.phone = "555-555-5555"
user_info2.variables = [{"name": "subscribeToMailingList", "value": "False"}]
# ActorIdentity to UserInfo association
actor_identity_to_info_association = Mock()
actor_identity_to_info_association._id = '555'
actor_identity_to_info_association.s = "111"
actor_identity_to_info_association.st = RT.ActorIdentity
actor_identity_to_info_association.p = PRED.hasInfo
actor_identity_to_info_association.o = "444"
actor_identity_to_info_association.ot = RT.UserInfo
# ActorIdentity to UserInfo association
actor_identity_to_info_association2 = Mock()
actor_identity_to_info_association2._id = '556'
actor_identity_to_info_association2.s = "1112"
actor_identity_to_info_association2.st = RT.ActorIdentity
actor_identity_to_info_association2.p = PRED.hasInfo
actor_identity_to_info_association2.o = "445"
actor_identity_to_info_association2.ot = RT.UserInfo
# ActorIdentity to Instrument Device association
Instrument_device_to_actor_identity_association = Mock()
Instrument_device_to_actor_identity_association._id = '666'
Instrument_device_to_actor_identity_association.s = "123"
Instrument_device_to_actor_identity_association.st = RT.InstrumentDevice
Instrument_device_to_actor_identity_association.p = PRED.hasOwner
Instrument_device_to_actor_identity_association.o = "111"
Instrument_device_to_actor_identity_association.ot = RT.ActorIdentity
# ActorIdentity to Instrument Device association
Instrument_device_to_actor_identity_association2 = Mock()
Instrument_device_to_actor_identity_association2._id = '667'
Instrument_device_to_actor_identity_association2.s = "456"
Instrument_device_to_actor_identity_association2.st = RT.InstrumentDevice
Instrument_device_to_actor_identity_association2.p = PRED.hasOwner
Instrument_device_to_actor_identity_association2.o = "111"
Instrument_device_to_actor_identity_association2.ot = RT.ActorIdentity
with self.assertRaises(BadRequest) as cm:
extended_user = extended_resource_handler.create_extended_resource_container(RT.ActorIdentity, '111')
self.assertIn( 'The requested resource ActorIdentity is not extended from ResourceContainer',cm.exception.message)
mock_clients.resource_registry.read.return_value = instrument_device
mock_clients.resource_registry.find_objects.return_value = ([actor_identity], [Instrument_device_to_actor_identity_association])
mock_clients.resource_registry.find_subjects.return_value = (None,None)
mock_clients.resource_registry.find_associations.return_value = [actor_identity_to_info_association, Instrument_device_to_actor_identity_association]
mock_clients.resource_registry.read_mult.return_value = [user_info]
extended_res = extended_resource_handler.create_extended_resource_container(OT.TestExtendedResource, '123')
self.assertEquals(extended_res.resource, instrument_device)
self.assertEquals(len(extended_res.owners),2)
self.assertEquals(extended_res.resource_object.type_, RT.SystemResource)
#.........这里部分代码省略.........
开发者ID:j2project,项目名称:pyon,代码行数:101,代码来源:test_resource.py
示例15: test_extended_resource_directed
def test_extended_resource_directed(self):
obs1_obj = IonObject(RT.Observatory, name="Observatory 1")
obs1_id, _ = self.RR.create(obs1_obj)
ps1_obj = IonObject(RT.PlatformSite, name="PlatformSite 1")
ps1_id, _ = self.RR.create(ps1_obj)
ps2_obj = IonObject(RT.PlatformSite, name="PlatformSite 2")
ps2_id, _ = self.RR.create(ps2_obj)
is1_obj = IonObject(RT.InstrumentSite, name="InstrumentSite 1")
is1_id, _ = self.RR.create(is1_obj)
extended_resource_handler = ExtendedResourceContainer(self)
ext_site = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceSite,
resource_id=ps1_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_site.__dict__)
self.assertEquals(ext_site.parent_site, None)
self.assertEquals(ext_site.child_sites, [])
self.assertEquals(ext_site.child_sites1, [])
self.assertEquals(ext_site.child_instrument_sites, [])
aid1, _ = self.RR.create_association(obs1_id, PRED.hasSite, ps1_id)
extended_resource_handler = ExtendedResourceContainer(self)
ext_site = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceSite,
resource_id=ps1_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_site.__dict__)
self.assertEquals(ext_site.parent_site._id, obs1_id)
self.assertEquals(ext_site.child_sites, [])
self.assertEquals(ext_site.child_sites1, [])
self.assertEquals(ext_site.child_instrument_sites, [])
self.RR.create_association(ps1_id, PRED.hasSite, is1_id)
extended_resource_handler = ExtendedResourceContainer(self)
ext_site = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceSite,
resource_id=ps1_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_site.__dict__)
self.assertEquals(ext_site.parent_site._id, obs1_id)
self.assertEquals(len(ext_site.child_sites), 1)
self.assertEquals(ext_site.child_sites[0]._id, is1_id)
self.assertEquals(len(ext_site.child_sites1), 1)
self.assertEquals(ext_site.child_sites1[0]._id, is1_id)
self.assertEquals(len(ext_site.child_instrument_sites), 1)
self.assertEquals(ext_site.child_instrument_sites[0]._id, is1_id)
self.RR.create_association(ps1_id, PRED.hasSite, ps2_id)
extended_resource_handler = ExtendedResourceContainer(self)
ext_site = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceSite,
resource_id=ps1_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_site.__dict__)
self.assertEquals(ext_site.parent_site._id, obs1_id)
self.assertEquals(len(ext_site.child_sites), 2)
self.assertEquals(set(r._id for r in ext_site.child_sites), set([is1_id, ps2_id]))
self.assertEquals(len(ext_site.child_sites1), 2)
self.assertEquals(set(r._id for r in ext_site.child_sites1), set([is1_id, ps2_id]))
self.assertEquals(len(ext_site.child_instrument_sites), 1)
self.assertEquals(ext_site.child_instrument_sites[0]._id, is1_id)
self.RR.delete_association(aid1)
extended_resource_handler = ExtendedResourceContainer(self)
ext_site = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceSite,
resource_id=ps1_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_site.__dict__)
self.assertEquals(ext_site.parent_site, None)
self.assertEquals(len(ext_site.child_sites), 2)
self.assertEquals(set(r._id for r in ext_site.child_sites), set([is1_id, ps2_id]))
self.assertEquals(len(ext_site.child_sites1), 2)
self.assertEquals(set(r._id for r in ext_site.child_sites1), set([is1_id, ps2_id]))
self.assertEquals(len(ext_site.child_instrument_sites), 1)
self.assertEquals(ext_site.child_instrument_sites[0]._id, is1_id)
开发者ID:rumineykova,项目名称:pyon,代码行数:99,代码来源:test_resource_extended_int.py
示例16: test_extended_resource
def test_extended_resource(self):
dev_obj = IonObject(RT.InstrumentDevice, name="InstrumentDevice 1")
dev_id, _ = self.RR.create(dev_obj)
dev2_obj = IonObject(RT.InstrumentDevice, name="InstrumentDevice 2")
dev2_id, _ = self.RR.create(dev2_obj)
dev3_obj = IonObject(RT.PlatformDevice, name="PlatformDevice 3")
dev3_id, _ = self.RR.create(dev3_obj)
site_obj = IonObject(RT.InstrumentSite, name="InstrumentSite 1")
site_id, _ = self.RR.create(site_obj)
dep_obj = IonObject(RT.Deployment, name="Deployment 1")
dep_id, _ = self.RR.create(dep_obj)
self.RR.create_association(dev_id, PRED.hasDeployment, dep_id)
res_objs, _ = self.RR.find_subjects(predicate=PRED.hasDeployment, object=dep_id, id_only=True)
self.assertEquals(len(res_objs), 1)
extended_resource_handler = ExtendedResourceContainer(self)
ext_deployment = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceDeploy,
resource_id=dep_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_deployment.__dict__)
self.assertEquals(ext_deployment.device_11._id, dev_id)
self.assertEquals(len(ext_deployment.device_12), 1)
self.assertEquals(ext_deployment.device_12[0]._id, dev_id)
self.assertEquals(ext_deployment.device_13, 1)
self.assertEquals(ext_deployment.device_14._id, dev_id)
self.assertEquals(ext_deployment.device_21._id, dev_id)
self.assertEquals(len(ext_deployment.device_12), 1)
self.assertEquals(ext_deployment.device_22[0]._id, dev_id)
self.assertEquals(ext_deployment.device_23, 1)
self.assertEquals(ext_deployment.device_24._id, dev_id)
self.assertEquals(ext_deployment.device_31._id, dev_id)
self.assertEquals(ext_deployment.site_11, None)
self.assertEquals(ext_deployment.site_12, [])
self.assertEquals(ext_deployment.site_13, 0)
self.assertEquals(ext_deployment.site_14, None)
self.assertEquals(ext_deployment.site_21, None)
self.assertEquals(ext_deployment.site_22, [])
self.assertEquals(ext_deployment.site_23, 0)
self.assertEquals(ext_deployment.site_24, None)
self.assertEquals(ext_deployment.site_31, None)
self.RR.create_association(site_id, PRED.hasDeployment, dep_id)
self.RR.create_association(dev2_id, PRED.hasDeployment, dep_id)
self.RR.create_association(dev3_id, PRED.hasDeployment, dep_id)
res_objs, _ = self.RR.find_subjects(predicate=PRED.hasDeployment, object=dep_id, id_only=True)
self.assertEquals(len(res_objs), 4)
extended_resource_handler = ExtendedResourceContainer(self)
ext_deployment = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.TestExtendedResourceDeploy,
resource_id=dep_id,
computed_resource_type=OT.BaseComputedAttributes,
)
# pprint.pprint(ext_deployment.__dict__)
all_devices = [dev_id, dev2_id, dev3_id]
all_instdevs = [dev_id, dev2_id]
self.assertIn(ext_deployment.device_11._id, all_instdevs)
self.assertEquals(len(ext_deployment.device_12), 2)
self.assertEquals(set([r._id for r in ext_deployment.device_12]), set(all_instdevs))
self.assertEquals(ext_deployment.device_13, 2)
self.assertIn(ext_deployment.device_14._id, all_instdevs)
self.assertIn(ext_deployment.device_21._id, all_devices)
self.assertEquals(len(ext_deployment.device_22), 3)
self.assertEquals(set([r._id for r in ext_deployment.device_22]), set(all_devices))
self.assertEquals(ext_deployment.device_23, 3)
self.assertIn(ext_deployment.device_24._id, all_devices)
self.assertIn(ext_deployment.device_31._id, all_devices)
self.assertEquals(ext_deployment.site_11._id, site_id)
self.assertEquals(len(ext_deployment.site_12), 1)
self.assertEquals(ext_deployment.site_12[0]._id, site_id)
self.assertEquals(ext_deployment.site_13, 1)
self.assertEquals(ext_deployment.site_14._id, site_id)
self.assertEquals(ext_deployment.site_21._id, site_id)
self.assertEquals(len(ext_deployment.site_22), 1)
self.assertEquals(ext_deployment.site_22[0]._id, site_id)
self.assertEquals(ext_deployment.site_23, 1)
self.assertEquals(ext_deployment.site_24._id, site_id)
self.assertEquals(ext_deployment.site_31._id, site_id)
开发者ID:rumineykova,项目名称:pyon,代码行数:100,代码来源:test_resource_extended_int.py
示例17: get_data_product_extension
def get_data_product_extension(self, data_product_id='', ext_associations=None, ext_exclude=None, user_id=''):
#Returns an DataProductExtension object containing additional related information
if not data_product_id:
raise BadRequest("The data_product_id parameter is empty")
extended_resource_handler = ExtendedResourceContainer(self)
extended_product = extended_resource_handler.create_extended_resource_container(
extended_resource_type=OT.DataProductExtension,
resource_id=data_product_id,
computed_resource_type=OT.DataProductComputedAttributes,
ext_associations=ext_associations,
ext_exclude=ext_exclude,
|
请发表评论