本文整理汇总了Python中pyon.datastore.datastore.DatastoreManager类的典型用法代码示例。如果您正苦于以下问题:Python DatastoreManager类的具体用法?Python DatastoreManager怎么用?Python DatastoreManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了DatastoreManager类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _load_datastore
def _load_datastore(cls, path=None, ds_name=None, ignore_errors=True):
if not DatastoreManager.exists(ds_name):
log.warn("Datastore does not exist: %s" % ds_name)
ds = DatastoreManager.get_datastore_instance(ds_name)
objects = []
for fn in os.listdir(path):
fp = os.path.join(path, fn)
try:
with open(fp, 'r') as f:
yaml_text = f.read()
obj = yaml.load(yaml_text)
if "_rev" in obj:
del obj["_rev"]
objects.append(obj)
except Exception as ex:
if ignore_errors:
log.warn("load error id=%s err=%s" % (fn, str(ex)))
else:
raise ex
if objects:
try:
res = ds.create_doc_mult(objects, allow_ids=True)
log.info("DatastoreLoader: Loaded %s objects into %s" % (len(res), ds_name))
except Exception as ex:
if ignore_errors:
log.warn("load error id=%s err=%s" % (fn, str(ex)))
else:
raise ex
开发者ID:dstuebe,项目名称:coi-services,代码行数:29,代码来源:datastore_loader.py
示例2: clear_datastore
def clear_datastore(cls, ds_name=None, prefix=None):
if CFG.system.mockdb:
log.warn("Cannot clear MockDB")
return
generic_ds = DatastoreManager.get_datastore_instance("")
if ds_name:
# First interpret ds_name as unqualified name
if DatastoreManager.exists(ds_name, scoped=False):
generic_ds.delete_datastore(ds_name)
return
# New interpret as logical name
if DatastoreManager.exists(ds_name, scoped=True):
generic_ds.delete_datastore(ds_name)
else:
log.warn("Datastore does not exist: %s" % ds_name)
elif prefix:
db_list = generic_ds.list_datastores()
cleared, ignored = 0, 0
for db_name in db_list:
if db_name.startswith(prefix):
generic_ds.delete_datastore(db_name)
log.debug("Cleared couch datastore '%s'" % db_name)
cleared += 1
else:
ignored += 1
log.info("Cleared %d couch datastores, ignored %d" % (cleared, ignored))
else:
log.warn("Cannot clear datastore without prefix or datastore name")
开发者ID:daf,项目名称:coi-services,代码行数:30,代码来源:datastore_loader.py
示例3: _dump_datastore
def _dump_datastore(cls, outpath_base, ds_name, clear_dir=True):
if not DatastoreManager.exists(ds_name):
log.warn("Datastore does not exist: %s" % ds_name)
return
ds = DatastoreManager.get_datastore_instance(ds_name)
if not os.path.exists(outpath_base):
os.makedirs(outpath_base)
outpath = "%s/%s" % (outpath_base, ds_name)
if not os.path.exists(outpath):
os.makedirs(outpath)
if clear_dir:
[os.remove(os.path.join(outpath, f)) for f in os.listdir(outpath)]
objs = ds.find_by_view("_all_docs", None, id_only=False, convert_doc=False)
numwrites = 0
for obj_id, obj_key, obj in objs:
fn = obj_id
# Some object ids have slashes
fn = obj_id.replace("/", "_")
with open("%s/%s.yml" % (outpath, fn), "w") as f:
yaml.dump(obj, f, default_flow_style=False)
numwrites += 1
log.info("Wrote %s objects to %s" % (numwrites, outpath))
开发者ID:daf,项目名称:coi-services,代码行数:25,代码来源:datastore_loader.py
示例4: _load_datastore
def _load_datastore(cls, path=None, ds_name=None, ignore_errors=True):
if not DatastoreManager.exists(ds_name):
log.warn("Datastore does not exist: %s" % ds_name)
ds = DatastoreManager.get_datastore_instance(ds_name)
for fn in os.listdir(path):
fp = os.path.join(path, fn)
try:
cls._read_and_create_obj(ds, fp)
except Exception as ex:
if ignore_errors:
log.warn("load error id=%s err=%s" % (fn, str(ex)))
else:
raise ex
开发者ID:daf,项目名称:coi-services,代码行数:13,代码来源:datastore_loader.py
示例5: test_conv_repo
def test_conv_repo(self):
dsm = DatastoreManager()
ds = dsm.get_datastore("conversations")
ds.delete_datastore()
ds.create_datastore()
conv_repo = ConvRepository(dsm)
conv1 = ConversationMessage(sender='sender', recipient='receiver', conversation_id='1', protocol='rpc', headers={'nofield':'novalue'})
conv_id, _ = conv_repo.put_conv(conv1)
conv1r = conv_repo.conv_store.read(conv_id)
self.assertEquals(conv1.sender, conv1r.sender)
开发者ID:ooici,项目名称:pyon,代码行数:13,代码来源:test_conversation_log.py
示例6: test_event_repo
def test_event_repo(self):
dsm = DatastoreManager()
ds = dsm.get_datastore("events")
ds.delete_datastore()
ds.create_datastore()
event_repo = EventRepository(dsm)
event_repo1 = EventRepository(dsm)
event1 = Event(origin="resource1")
event_id, _ = event_repo.put_event(event1)
event1r = event_repo.get_event(event_id)
self.assertEquals(event1.origin, event1r.origin)
ts = 1328680477138
events2 = []
for i in xrange(5):
ev = Event(origin="resource2", ts_created=str(ts + i))
event_id, _ = event_repo.put_event(ev)
events2.append((ev,event_id))
events_r = event_repo.find_events(origin='resource2')
self.assertEquals(len(events_r), 5)
events_r = event_repo.find_events(origin='resource2', descending=True)
self.assertEquals(len(events_r), 5)
events_r = event_repo.find_events(origin='resource2', limit=3)
self.assertEquals(len(events_r), 3)
events_r = event_repo.find_events(origin='resource2', start_ts=str(ts+3))
self.assertEquals(len(events_r), 2)
events_r = event_repo.find_events(origin='resource2', end_ts=str(ts+2))
self.assertEquals(len(events_r), 3)
events_r = event_repo.find_events(origin='resource2', start_ts=str(ts+3), end_ts=str(ts+4))
self.assertEquals(len(events_r), 2)
events_r = event_repo.find_events(start_ts=str(ts+3), end_ts=str(ts+4))
self.assertEquals(len(events_r), 2)
event3 = ResourceLifecycleEvent(origin="resource3")
event_id, _ = event_repo.put_event(event3)
events_r = event_repo.find_events(event_type="ResourceLifecycleEvent")
self.assertEquals(len(events_r), 1)
开发者ID:daf,项目名称:pyon,代码行数:49,代码来源:test_event.py
示例7: __init__
def __init__(self):
bootstrap_pyon()
dsm = DatastoreManager()
self.datastore = dsm.get_datastore(ds_name='coverage')
if self.datastore is None:
raise RuntimeError("Unable to load datastore for coverage")
else:
self.entity_table_name = self.datastore._get_datastore_name()
log.trace("Got datastore: %s type %s" % (self.datastore._get_datastore_name(), str(type(self.datastore))))
self.span_store = dsm.get_datastore(ds_name='coverage_spans')
if self.span_store is None:
raise RuntimeError("Unable to load datastore for coverage_spans")
else:
self.span_table_name = self.span_store._get_datastore_name()
log.trace("Got datastore: %s type %s", self.span_store._get_datastore_name(), type(self.span_store))
开发者ID:ooici,项目名称:coverage-model,代码行数:15,代码来源:db_connectors.py
示例8: on_start
def on_start(self):
#---------------------------------------------------------------------------------------------------
# Get the event Repository
#---------------------------------------------------------------------------------------------------
self.event_repo = self.container.instance.event_repository
self.smtp_client = setting_up_smtp_client()
self.ION_NOTIFICATION_EMAIL_ADDRESS = '[email protected]'
#---------------------------------------------------------------------------------------------------
# Create an event processor
#---------------------------------------------------------------------------------------------------
self.event_processor = EmailEventProcessor(self.smtp_client)
#---------------------------------------------------------------------------------------------------
# load event originators, types, and table
#---------------------------------------------------------------------------------------------------
self.event_types = CFG.event.types
self.event_table = {}
#---------------------------------------------------------------------------------------------------
# Get the clients
#---------------------------------------------------------------------------------------------------
self.discovery = DiscoveryServiceClient()
self.process_dispatcher = ProcessDispatcherServiceClient()
self.datastore_manager = DatastoreManager()
self.event_publisher = EventPublisher()
self.scheduler_service = SchedulerService()
开发者ID:pombredanne,项目名称:coi-services,代码行数:35,代码来源:user_notification_service.py
示例9: delete_ui
def delete_ui(self):
resource_types = [
'UIInternalResourceType',
'UIInformationLevel',
'UIScreenLabel',
'UIAttribute',
'UIBlock',
'UIGroup',
'UIRepresentation',
'UIResourceType',
'UIView',
'UIBlockAttribute',
'UIBlockRepresentation',
'UIGroupBlock',
'UIViewGroup']
res_ids = []
for restype in resource_types:
res_is_list, _ = self.container.resource_registry.find_resources(restype, id_only=True)
res_ids.extend(res_is_list)
log.debug("Found %s resources of type %s" % (len(res_is_list), restype))
ds = DatastoreManager.get_datastore_instance("resources")
docs = ds.read_doc_mult(res_ids)
for doc in docs:
doc['_deleted'] = True
ds.create_doc_mult(docs, allow_ids=True)
开发者ID:ooici-eoi,项目名称:coi-services,代码行数:30,代码来源:ion_loader.py
示例10: __init__
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs)
from pyon.core import bootstrap
bootstrap.sys_name = CFG.system.name or bootstrap.sys_name
log.debug("Container (sysname=%s) initializing ..." % bootstrap.sys_name)
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = DictModifier(CFG, kwargs)
# Load object and service registry etc.
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
log.debug("Container initialized, OK.")
开发者ID:wfrench,项目名称:pyon,代码行数:34,代码来源:cc.py
示例11: create_resources_snapshot
def create_resources_snapshot(self, persist=False, filename=None):
ds = DatastoreManager.get_datastore_instance(DataStore.DS_RESOURCES, DataStore.DS_PROFILE.RESOURCES)
all_objs = ds.find_docs_by_view("_all_docs", None, id_only=False)
log.info("Found %s objects in datastore resources", len(all_objs))
resources = {}
associations = {}
snapshot = dict(resources=resources, associations=associations)
for obj_id, key, obj in all_objs:
if obj_id.startswith("_design"):
continue
if not isinstance(obj, dict):
raise Inconsistent("Object of bad type found: %s" % type(obj))
obj_type = obj.get("type_", None)
if obj_type == "Association":
associations[obj_id] = obj.get("ts", None)
elif obj_type:
resources[obj_id] = obj.get("ts_updated", None)
else:
raise Inconsistent("Object with no type_ found: %s" % obj)
if persist:
dtstr = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')
path = filename or "interface/rrsnapshot_%s.json" % dtstr
snapshot_json = json.dumps(snapshot)
with open(path, "w") as f:
#yaml.dump(snapshot, f, default_flow_style=False)
f.write(snapshot_json)
log.debug("Created resource registry snapshot. %s resources, %s associations", len(resources), len(associations))
return snapshot
开发者ID:edwardhunter,项目名称:scioncc,代码行数:34,代码来源:resources.py
示例12: __init__
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set id and name (as they are set in base class call)
self.id = string.replace('%s_%d' % (os.uname()[1], os.getpid()), ".", "_")
self.name = "cc_agent_%s" % self.id
Container.instance = self
# TODO: Bug: Replacing CFG instance not work because references are already public. Update directly
dict_merge(CFG, kwargs, inplace=True)
from pyon.core import bootstrap
bootstrap.container_instance = self
bootstrap.assert_configuration(CFG)
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# Load object and service registry etc.
bootstrap_pyon()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._is_started = False
self._capabilities = []
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
log.debug("Container initialized, OK.")
开发者ID:ooici-dm,项目名称:pyon,代码行数:54,代码来源:cc.py
示例13: get_blame_objects
def get_blame_objects(cls):
ds_list = ['resources', 'objects', 'state', 'events', 'directory', 'scidata']
blame_objs = {}
for ds_name in ds_list:
ds = DatastoreManager.get_datastore_instance(ds_name)
ret_objs = ds.find_by_view("_all_docs", None, id_only=False, convert_doc=False)
objs = []
for obj_id, obj_key, obj in ret_objs:
if "blame_" in obj:
objs.append(obj)
blame_objs[ds_name] = objs
return blame_objs
开发者ID:seman,项目名称:coi-services,代码行数:12,代码来源:datastore_loader.py
示例14: __init__
def __init__(self, *args, **kwargs):
BaseContainerAgent.__init__(self, *args, **kwargs)
self._is_started = False
# set container id and cc_agent name (as they are set in base class call)
self.id = get_default_container_id()
self.name = "cc_agent_%s" % self.id
self._capabilities = []
from pyon.core import bootstrap
bootstrap.container_instance = self
Container.instance = self
log.debug("Container (sysname=%s) initializing ..." % bootstrap.get_sys_name())
# Keep track of the overrides from the command-line, so they can trump app/rel file data
self.spawn_args = kwargs
# DatastoreManager - controls access to Datastores (both mock and couch backed)
self.datastore_manager = DatastoreManager()
# TODO: Do not start a capability here. Symmetric start/stop
self.datastore_manager.start()
self._capabilities.append("DATASTORE_MANAGER")
# Instantiate Directory
self.directory = Directory()
# Create this Container's specific ExchangeManager instance
self.ex_manager = ExchangeManager(self)
# Create this Container's specific ProcManager instance
self.proc_manager = ProcManager(self)
# Create this Container's specific AppManager instance
self.app_manager = AppManager(self)
# File System - Interface to the OS File System, using correct path names and setups
self.file_system = FileSystem(CFG)
# Governance Controller - manages the governance related interceptors
self.governance_controller = GovernanceController(self)
# sFlow manager - controls sFlow stat emission
self.sflow_manager = SFlowManager(self)
# Coordinates the container start
self._status = "INIT"
# protection for when the container itself is used as a Process for clients
self.container = self
log.debug("Container initialized, OK.")
开发者ID:swarbhanu,项目名称:pyon,代码行数:53,代码来源:cc.py
示例15: delete_ooi_assets
def delete_ooi_assets(self):
res_ids = []
ooi_asset_types = ['InstrumentModel',
'PlatformModel',
'Observatory',
'Subsite',
'PlatformSite',
'InstrumentSite',
'InstrumentAgent',
'InstrumentAgentInstance',
'InstrumentDevice',
'PlatformAgent',
'PlatformAgentInstance',
'PlatformDevice',
'Deployment',
'DataProduct'
]
self.resource_ds = DatastoreManager.get_datastore_instance(DataStore.DS_RESOURCES, DataStore.DS_PROFILE.RESOURCES)
del_objs = {}
del_assocs = {}
all_objs = self.resource_ds.find_by_view("_all_docs", None, id_only=False, convert_doc=False)
for obj_id, key, obj in all_objs:
if obj_id.startswith("_design") or not isinstance(obj, dict):
continue
obj_type = obj.get("type_", None)
if obj_type and obj_type in ooi_asset_types:
del_objs[obj_id] = obj
for obj_id, key, obj in all_objs:
if obj_id.startswith("_design") or not isinstance(obj, dict):
continue
obj_type = obj.get("type_", None)
if obj_type == "Association":
if obj['o'] in del_objs or obj['s'] in del_objs:
del_assocs[obj_id] = obj
for doc in del_objs.values():
doc_id, doc_rev = doc['_id'], doc['_rev']
doc.clear()
doc.update(dict(_id=doc_id, _rev=doc_rev, _deleted=True))
for doc in del_assocs.values():
doc_id, doc_rev = doc['_id'], doc['_rev']
doc.clear()
doc.update(dict(_id=doc_id, _rev=doc_rev, _deleted=True))
self.resource_ds.update_doc_mult(del_objs.values())
self.resource_ds.update_doc_mult(del_assocs.values())
log.info("Deleted %s OOI resources and %s associations", len(del_objs), len(del_assocs))
开发者ID:jamie-cyber1,项目名称:coi-services,代码行数:50,代码来源:ooi_loader.py
示例16: delete_ui
def delete_ui(self):
res_ids = []
for restype in self.UI_RESOURCE_TYPES:
res_is_list, _ = self.container.resource_registry.find_resources(restype, id_only=True)
res_ids.extend(res_is_list)
#log.debug("Found %s resources of type %s" % (len(res_is_list), restype))
ds = DatastoreManager.get_datastore_instance("resources")
docs = ds.read_doc_mult(res_ids)
for doc in docs:
doc['_deleted'] = True
# TODO: Also delete associations
ds.update_doc_mult(docs)
log.info("Deleted %s UI resources and associations", len(docs))
开发者ID:Bobfrat,项目名称:coi-services,代码行数:17,代码来源:ui_loader.py
示例17: dump_resources_as_xlsx
def dump_resources_as_xlsx(self, filename=None):
self._clear()
ds = DatastoreManager.get_datastore_instance(DataStore.DS_RESOURCES, DataStore.DS_PROFILE.RESOURCES)
all_objs = ds.find_docs_by_view("_all_docs", None, id_only=False)
log.info("Found %s objects in datastore resources", len(all_objs))
self._analyze_objects(all_objs)
self._wb = xlwt.Workbook()
self._worksheets = {}
self._dump_observatories()
self._dump_network()
for restype in sorted(self._res_by_type.keys()):
self._dump_resource_type(restype)
dtstr = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')
path = filename or "interface/resources_%s.xls" % dtstr
self._wb.save(path)
开发者ID:edwardhunter,项目名称:scioncc,代码行数:22,代码来源:resources.py
示例18: dump_datastore
def dump_datastore(cls, path=None, ds_name=None, clear_dir=True):
"""
Dumps CouchDB datastores into a directory as YML files.
@param ds_name Logical name (such as "resources") of an ION datastore
@param path Directory to put dumped datastores into (defaults to
"res/preload/local/dump_[timestamp]")
@param clear_dir if True, delete contents of datastore dump dirs
"""
if CFG.system.mockdb:
log.warn("Cannot dump from MockDB")
return
if not path:
dtstr = datetime.datetime.today().strftime("%Y%m%d_%H%M%S")
path = "res/preload/local/dump_%s" % dtstr
if ds_name:
if DatastoreManager.exists(ds_name):
cls._dump_datastore(ds_name, path, clear_dir)
else:
log.warn("Datastore does not exist")
else:
ds_list = ["resources", "objects", "state", "events", "directory", "scidata"]
for ds in ds_list:
cls._dump_datastore(path, ds, clear_dir)
开发者ID:daf,项目名称:coi-services,代码行数:23,代码来源:datastore_loader.py
示例19: UserNotificationService
class UserNotificationService(BaseUserNotificationService):
"""
A service that provides users with an API for CRUD methods for notifications.
"""
def on_start(self):
#---------------------------------------------------------------------------------------------------
# Get the event Repository
#---------------------------------------------------------------------------------------------------
self.event_repo = self.container.instance.event_repository
self.smtp_client = setting_up_smtp_client()
self.ION_NOTIFICATION_EMAIL_ADDRESS = '[email protected]'
#---------------------------------------------------------------------------------------------------
# Create an event processor
#---------------------------------------------------------------------------------------------------
self.event_processor = EmailEventProcessor(self.smtp_client)
#---------------------------------------------------------------------------------------------------
# load event originators, types, and table
#---------------------------------------------------------------------------------------------------
self.event_types = CFG.event.types
self.event_table = {}
#---------------------------------------------------------------------------------------------------
# Get the clients
#---------------------------------------------------------------------------------------------------
self.discovery = DiscoveryServiceClient()
self.process_dispatcher = ProcessDispatcherServiceClient()
self.datastore_manager = DatastoreManager()
self.event_publisher = EventPublisher()
self.scheduler_service = SchedulerService()
def on_quit(self):
pass
def create_notification(self, notification=None, user_id=''):
"""
Persists the provided NotificationRequest object for the specified Origin id.
Associate the Notification resource with the user_id string.
returned id is the internal id by which NotificationRequest will be identified
in the data store.
@param notification NotificationRequest
@param user_id str
@retval notification_id str
@throws BadRequest if object passed has _id or _rev attribute
"""
if not user_id:
raise BadRequest("User id not provided.")
#---------------------------------------------------------------------------------------------------
# Persist Notification object as a resource if it has already not been persisted
#---------------------------------------------------------------------------------------------------
# find all notifications in the system
notifs, _ = self.clients.resource_registry.find_resources(restype = RT.NotificationRequest)
# if the notification has already been registered, simply use the old id
if notification in notifs:
log.warning("Notification object has already been created in resource registry before for another user. No new id to be generated.")
notification_id = notification._id
else:
# since the notification has not been registered yet, register it and get the id
notification_id, _ = self.clients.resource_registry.create(notification)
#-------------------------------------------------------------------------------------------------------------------
# read the registered notification request object because this has an _id and is more useful
#-------------------------------------------------------------------------------------------------------------------
notification = self.clients.resource_registry.read(notification_id)
#-----------------------------------------------------------------------------------------------------------
# Create an event processor for user. This sets up callbacks etc.
# As a side effect this updates the UserInfo object and also the user info and reverse user info dictionaries.
#-----------------------------------------------------------------------------------------------------------
user = self.event_processor.add_notification_for_user(notification_request=notification, user_id=user_id)
#-------------------------------------------------------------------------------------------------------------------
# Allow the indexes to be updated for ElasticSearch
# We publish event only after this so that the reload of the user info works by the
# notification workers work properly
#-------------------------------------------------------------------------------------------------------------------
# todo: This is to allow time for the indexes to be created before publishing ReloadUserInfoEvent for notification workers.
# todo: When things are more refined, it will be nice to have an event generated when the
# indexes are updated so that a subscriber here when it received that event will publish
# the reload user info event.
#.........这里部分代码省略.........
开发者ID:pombredanne,项目名称:coi-services,代码行数:101,代码来源:user_notification_service.py
示例20: test_directory
def test_directory(self):
dsm = DatastoreManager()
ds = dsm.get_datastore("resources")
ds.delete_datastore()
ds.create_datastore()
directory = Directory(datastore_manager=dsm)
directory.start()
#self.addCleanup(directory.dir_store.delete_datastore)
objs = directory.dir_store.list_objects()
self.assert_("_design/directory" in objs)
root = directory.lookup("/DIR")
self.assert_(root is not None)
entry = directory.lookup("/temp")
self.assert_(entry is None)
entry_old = directory.register("/","temp")
self.assertEquals(entry_old, None)
# Create a node
entry = directory.lookup("/temp")
self.assertEquals(entry, {} )
# The create case
entry_old = directory.register("/temp", "entry1", foo="awesome")
self.assertEquals(entry_old, None)
entry_new = directory.lookup("/temp/entry1")
self.assertEquals(entry_new, {"foo":"awesome"})
# The update case
entry_old = directory.register("/temp", "entry1", foo="ingenious")
self.assertEquals(entry_old, {"foo":"awesome"})
# The delete case
entry_old = directory.unregister("/temp", "entry1")
self.assertEquals(entry_old, {"foo":"ingenious"})
entry_new = directory.lookup("/temp/entry1")
self.assertEquals(entry_new, None)
directory.register("/BranchA", "X", resource_id="rid1")
directory.register("/BranchA", "Y", resource_id="rid2")
directory.register("/BranchA", "Z", resource_id="rid3")
directory.register("/BranchA/X", "a", resource_id="rid4")
directory.register("/BranchA/X", "b", resource_id="rid5")
directory.register("/BranchB", "k", resource_id="rid6")
directory.register("/BranchB", "l", resource_id="rid7")
directory.register("/BranchB/k", "m", resource_id="rid7")
directory.register("/BranchB/k", "X")
res_list = directory.find_by_value("/", attribute="resource_id", value="rid3")
self.assertEquals(len(res_list), 1)
self.assertEquals(res_list[0].org, "ION")
self.assertEquals(res_list[0].parent, "/BranchA")
self.assertEquals(res_list[0].key, "Z")
res_list = directory.find_by_value("/", attribute="resource_id", value="rid34")
self.assertEquals(len(res_list), 0)
res_list = directory.find_by_value("/", attribute="resource_id", value="rid7")
self.assertEquals(len(res_list), 2)
res_list = directory.find_by_value("/BranchB", attribute="resource_id", value="rid7")
self.assertEquals(len(res_list), 2)
res_list = directory.find_by_value("/Branch", attribute="resource_id", value="rid7")
self.assertEquals(len(res_list), 2)
res_list = directory.find_by_value("/BranchB/k", attribute="resource_id", value="rid7")
self.assertEquals(len(res_list), 1)
res_list = directory.find_child_entries("/BranchB/k/m")
self.assertEquals(len(res_list), 0)
res_list = directory.find_child_entries("/BranchB")
self.assertEquals(len(res_list), 2)
res_list = directory.find_child_entries("/BranchB/k/m", direct_only=False)
self.assertEquals(len(res_list), 0)
res_list = directory.find_child_entries("/BranchB", direct_only=False)
self.assertEquals(len(res_list), 4)
res_list = directory.find_by_key("X")
self.assertEquals(len(res_list), 2)
res_list = directory.find_by_key("X", parent="/BranchB")
self.assertEquals(len(res_list), 1)
directory.stop()
开发者ID:ateranishi,项目名称:pyon,代码行数:93,代码来源:test_directory.py
注:本文中的pyon.datastore.datastore.DatastoreManager类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论