本文整理汇总了Python中zope.component.queryUtility函数的典型用法代码示例。如果您正苦于以下问题:Python queryUtility函数的具体用法?Python queryUtility怎么用?Python queryUtility使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了queryUtility函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_unregister_behavior
def test_unregister_behavior(self):
# Test taxonomy registration
taxonomy = queryUtility(ITaxonomy, name='collective.taxonomy.test')
self.assertIsNotNone(taxonomy)
# Unregister behavior
taxonomy.unregisterBehavior()
# Test behavior registration
behavior = queryUtility(IBehavior, name=taxonomy.getGeneratedName())
self.assertIsNone(behavior)
# Test index creation
pc = api.portal.get_tool('portal_catalog')
self.assertNotIn('taxonomy_test', pc.indexes())
# Test indexer registration
sm = getSiteManager()
indexer = sm._adapter_registrations.get(
((IDexterityContent, IZCatalog),
IIndexer, 'taxonomy_test'),
None)
self.assertIsNone(indexer)
# Test querystring configuration
registry = queryUtility(IRegistry)
self.assertIsNotNone(registry)
prefix = 'plone.app.querystring.field.taxonomy_test'
self.assertRaises(
KeyError, registry.forInterface, IQueryField, prefix=prefix) # noqa
开发者ID:collective,项目名称:collective.taxonomy,代码行数:30,代码来源:test_behavior.py
示例2: __init__
def __init__(self, taskName, configId, scheduleIntervalSeconds=60, taskConfig=None):
"""
@param deviceId: the Zenoss deviceId to watch
@type deviceId: string
@param taskName: the unique identifier for this task
@type taskName: string
@param scheduleIntervalSeconds: the interval at which this task will be
collected
@type scheduleIntervalSeconds: int
@param taskConfig: the configuration for this task
"""
super(NmapPingTask, self).__init__(taskName, configId, scheduleIntervalSeconds, taskConfig=None)
# Needed for interface
self.name = taskName
self.configId = configId
self.state = TaskStates.STATE_IDLE
self.interval = scheduleIntervalSeconds
if taskConfig is None:
raise TypeError("taskConfig cannot be None")
self._preferences = taskConfig
self._daemon = component.getUtility(ZenCollector.interfaces.ICollector)
self._dataService = component.queryUtility(ZenCollector.interfaces.IDataService)
self._eventService = component.queryUtility(ZenCollector.interfaces.IEventService)
self._pings = 0
self._nmapPresent = False # assume nmap is not present at startup
self._nmapIsSuid = False # assume nmap is not SUID at startup
self._cycleIntervalReasonable = True # assume interval is fine at startup
self.collectorName = self._daemon._prefs.collectorName
# maps task name to ping down count and time of last ping down
self._down_counts = defaultdict(lambda: (0, None))
开发者ID:damilare,项目名称:zenoss-prodbin,代码行数:35,代码来源:NmapPingTask.py
示例3: register
def register(fti):
"""Helper method to:
- register an FTI as a local utility
- register a local factory utility
- register an add view
"""
fti = aq_base(fti) # remove acquisition wrapper
site = getUtility(ISiteRoot)
site_manager = getSiteManager(site)
portal_type = fti.getId()
fti_utility = queryUtility(IDexterityFTI, name=portal_type)
if fti_utility is None:
site_manager.registerUtility(
fti,
IDexterityFTI,
portal_type,
info='plone.dexterity.dynamic'
)
factory_utility = queryUtility(IFactory, name=fti.factory)
if factory_utility is None:
site_manager.registerUtility(
DexterityFactory(portal_type),
IFactory,
fti.factory,
info='plone.dexterity.dynamic'
)
开发者ID:adam139,项目名称:plone.dexterity,代码行数:31,代码来源:fti.py
示例4: test_enabled_uninstalled
def test_enabled_uninstalled(self):
dsb = DummyStorageBackend()
getSiteManager().registerUtility(
dsb, IStorageBackend, name='dummy_storage')
utilities = queryUtility(IUtilityRegistry, 'repodono.storage.backends')
utilities.enable('dummy_storage')
vocab = queryUtility(
IVocabularyFactory, name='repodono.storage.backends')(None)
self.assertEqual(vocab.getTermByToken(
'dummy_storage').token, 'dummy_storage')
getSiteManager().unregisterUtility(
dsb, IStorageBackend, name='dummy_storage')
vocab = queryUtility(
IVocabularyFactory, name='repodono.storage.backends')(None)
self.assertEqual(list(vocab), [])
# registry should be untouched at this point
registry = queryUtility(IRegistry)
self.assertEqual(
registry['repodono.storage.backends'], [u'dummy_storage'])
# shouldn't break anything.
utilities.disable('unrelated')
self.assertEqual(list(vocab), [])
# The bad value is no longer stored.
self.assertEqual(registry['repodono.storage.backends'], [])
开发者ID:repodono,项目名称:repodono.storage,代码行数:31,代码来源:test_utilities.py
示例5: __init__
def __init__(self, fti=None):
if not fti:
return
sql_connection = queryUtility(ISQLBaseConnectionUtility, name=fti.sql_connection, default=None)
if sql_connection:
self.connection_name = sql_connection.name
else:
sql_connection = queryUtility(ISQLBaseConnectionUtility, name=fti.sql_table, default=None)
if sql_connection:
self.connection_name = sql_connection.name
else:
processor = SQLBaseConnectionUtility(fti)
connection_name = processor.name
LOG.info('Base connection utility registered as '+connection_name)
gsm = getGlobalSiteManager()
gsm.registerUtility(processor, ISQLBaseConnectionUtility, name=connection_name)
self.connection_name = connection_name
self.sql_table = fti.sql_table
self.factory = fti.factory
self.sql_id_column = getattr(fti, 'sql_id_column', None) and getattr(fti, 'sql_id_column', None) or 'id'
fieldnames = {}
for field_name, field in schema.getFieldsInOrder( fti.lookupSchema() ):
if getattr(field, 'sql_column', None):
sql_column = getattr(field, 'sql_column', None)
fieldnames[field_name] = sql_column
for line in getattr(fti, 'sql_fields_columns', []):
fieldnames[line.split(':')[0]] = line.split(':')[1]
self.fieldnames = fieldnames
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:28,代码来源:content.py
示例6: setupGenericImage
def setupGenericImage(site):
""" Add generic image within portal_depiction if it doesn't exists
"""
tool = queryUtility(IDepictionTool, context=site)
tool = tool.__of__(site)
if 'generic' in tool.objectIds():
return
img = site.restrictedTraverse(
'++resource++eea.depiction.images/generic.jpg')
data = img.GET()
# needed for tests
storage = queryUtility(IStorage, name="__builtin__.str")
if storage is None:
from plone.namedfile.storages import StringStorable
provideUtility(StringStorable(), IStorage, name="__builtin__.str")
image = NamedBlobImage(data=data, contentType="image/jpeg",
filename=u"generic.jpg")
id = tool.invokeFactory('Image', id='generic', title='Generic')
obj = tool._getOb(id)
if IBaseObject.providedBy(obj):
obj.edit(image=image)
else:
obj.image = image
开发者ID:collective,项目名称:eea.depiction,代码行数:30,代码来源:setuphandlers.py
示例7: normalize
def normalize(self, text, locale=None, max_length=MAX_LENGTH):
"""
Returns a normalized text. text has to be a unicode string and locale
should be a normal locale, for example: 'pt-BR', '[email protected]' or 'de'
"""
if locale is not None:
# Try to get a normalizer for the locale
util = queryUtility(IIDNormalizer, name=locale)
parts = LOCALE_SPLIT_REGEX.split(locale)
if util is None and len(parts) > 1:
# Try to get a normalizer for the base language if we asked
# for one for a language/country combination and found none
util = queryUtility(IIDNormalizer, name=parts[0])
# be defensive: if queryUtility() returns an instance of the same
# normalizer class as this one, we'll loop forever until
# "RuntimeError: maximum recursion depth exceeded" (ticket #11630)
if util is not None and util.__class__ is not self.__class__:
text = util.normalize(text, locale=locale)
text = baseNormalize(text)
# lowercase text
text = text.lower()
text = IGNORE_REGEX.sub('', text)
text = NON_WORD_REGEX.sub('-', text)
text = MULTIPLE_DASHES_REGEX.sub('-', text)
text = EXTRA_DASHES_REGEX.sub('', text)
return cropName(text, maxLength=max_length)
开发者ID:Vinsurya,项目名称:Plone,代码行数:30,代码来源:__init__.py
示例8: updateRelations
def updateRelations(obj, event):
"""Re-register relations, after they have been changed.
"""
catalog = component.queryUtility(ICatalog)
intids = component.queryUtility(IIntIds)
if catalog is None or intids is None:
return
# check that the object has an intid, otherwise there's nothing to be done
try:
obj_id = intids.getId(obj)
except KeyError:
# The object has not been added to the ZODB yet
return
# remove previous relations coming from id (now have been overwritten)
# have to activate query here with list() before unindexing them so we don't
# get errors involving buckets changing size
rels = list(catalog.findRelations({'from_id': obj_id}))
for rel in rels:
catalog.unindex(rel)
# add new relations
addRelations(obj, event)
开发者ID:CGTIC,项目名称:Plone_SP,代码行数:25,代码来源:event.py
示例9: purge
def purge(event):
"""Asynchronously send PURGE requests
"""
request = event.request
annotations = IAnnotations(request, None)
if annotations is None:
return
paths = annotations.get(KEY, None)
if paths is None:
return
registry = queryUtility(IRegistry)
if registry is None:
return
if not isCachePurgingEnabled(registry=registry):
return
purger = queryUtility(IPurger)
if purger is None:
return
settings = registry.forInterface(ICachePurgingSettings, check=False)
for path in paths:
for url in getURLsToPurge(path, settings.cachingProxies):
purger.purgeAsync(url)
开发者ID:dokai,项目名称:plone.cachepurging,代码行数:29,代码来源:hooks.py
示例10: setupVarious
def setupVarious(context):
""" Custom setup """
if context.readDataFile('scoreboard.visualization.txt') is None:
return
ds = queryUtility(IDavizSettings)
if not ds.disabled('daviz.properties', 'ScoreboardVisualization'):
logger.info('Disabling Daviz Properties for ScoreboardVisualization')
ds.settings.setdefault('forbidden.daviz.properties', [])
ds.settings['forbidden.daviz.properties'].append(
'ScoreboardVisualization')
ptool = queryUtility(IPropertiesTool)
if not getattr(ptool, 'scoreboard_properties', None):
ptool.manage_addPropertySheet(
'scoreboard_properties', 'Scoreboard Properties')
stool = getattr(ptool, 'scoreboard_properties', None)
eu = stool.getProperty('EU', None)
if not eu:
default = json.dumps(EU, indent=2)
stool.manage_addProperty('EU', default, 'text')
whitelist = stool.getProperty('WHITELIST', None)
if not whitelist:
default = json.dumps(WHITELIST, indent=2)
stool.manage_addProperty('WHITELIST', default, 'text')
开发者ID:digital-agenda-data,项目名称:scoreboard.visualization,代码行数:28,代码来源:setuphandlers.py
示例11: queryResourceDirectory
def queryResourceDirectory(type, name):
"""Find the IResourceDirectory of the given name and type. Returns
None if not found.
"""
# 1. Persistent resource directory:
# Try (persistent resource directory)/$type/$name
res = queryUtility(IResourceDirectory, name=u'persistent')
if res:
try:
return res[type][name]
except (KeyError, NotFound,):
pass # pragma: no cover
# 2. Global resource directory:
# Try (global resource directory)/$type/$name
res = queryUtility(IResourceDirectory, name=u'')
if res:
try:
return res[type][name]
except (KeyError, NotFound,):
pass # pragma: no cover
# 3. Packaged type-specific resource directory:
# Try (directory named after type + name)
identifier = u'++%s++%s' % (type, name)
res = queryUtility(IResourceDirectory, name=identifier)
if res is not None:
return res
return None
开发者ID:Vinsurya,项目名称:Plone,代码行数:31,代码来源:utils.py
示例12: generate
def generate(self):
"""the rendered feed.
@return: tuple of data and mimetype.
"""
producer = queryUtility(IFeedSkeletonProducer, name=self.name)
if producer is None:
return None, None
tree = producer()
named_modifiers = list(getAdapters((self.feed, tree), IFeedModifier))
named_modifiers.sort(key=operator.itemgetter(0))
namespaces = Set()
for name, modifier in named_modifiers:
ns = modifier.modify()
namespaces.update(ns)
mimetype = IMimeTypeLookup(tree)
prefixmap = {}
for ns in namespaces:
prefix = queryUtility(INamespacePrefix, name=ns)
if prefix is None:
continue
prefixmap[ns] = prefix
writer = XMLWriter(tree, prefixmap)
result = writer(), mimetype
return result
开发者ID:bluedynamics,项目名称:cornerstone.feed.core,代码行数:25,代码来源:feedgenerator.py
示例13: test_global_components_not_unregistered_on_delete
def test_global_components_not_unregistered_on_delete(self):
portal_type = u"testtype"
fti = DexterityFTI(portal_type)
container_dummy = self.create_dummy()
# Mock the lookup of the site and the site manager at the site root
dummy_site = self.create_dummy()
self.mock_utility(dummy_site, ISiteRoot)
site_manager_mock = self.mocker.proxy(PersistentComponents(bases=(getGlobalSiteManager(),)))
getSiteManager_mock = self.mocker.replace("zope.component.hooks.getSiteManager")
self.expect(getSiteManager_mock(dummy_site)).result(site_manager_mock)
# Register FTI utility and factory utility
self.mock_utility(fti, IDexterityFTI, name=portal_type)
self.mock_utility(DexterityFactory(portal_type), IFactory, name=portal_type)
# We expect to always be able to unregister without error, even if the
# component exists. The factory is only unregistered if it was registered
# with info='plone.dexterity.dynamic'.
self.expect(site_manager_mock.unregisterUtility(provided=IDexterityFTI, name=portal_type)).passthrough()
self.replay()
ftiRemoved(fti, ObjectRemovedEvent(fti, container_dummy, fti.getId()))
site_dummy = self.create_dummy(getSiteManager=lambda: site_manager_mock)
setSite(site_dummy)
setHooks()
self.assertNotEquals(None, queryUtility(IDexterityFTI, name=portal_type))
self.assertNotEquals(None, queryUtility(IFactory, name=portal_type))
开发者ID:numahell,项目名称:plone.dexterity,代码行数:34,代码来源:test_fti.py
示例14: test_components_unregistered_on_delete
def test_components_unregistered_on_delete(self):
portal_type = u"testtype"
fti = DexterityFTI(portal_type)
container_dummy = self.create_dummy()
# Mock the lookup of the site and the site manager at the site root
dummy_site = self.create_dummy()
self.mock_utility(dummy_site, ISiteRoot)
site_manager_mock = self.mocker.proxy(PersistentComponents(bases=(getGlobalSiteManager(),)))
getSiteManager_mock = self.mocker.replace("zope.component.hooks.getSiteManager")
self.expect(getSiteManager_mock(dummy_site)).result(site_manager_mock).count(1, None)
# We expect to always be able to unregister without error, even if the
# components do not exists (as here)
self.expect(site_manager_mock.unregisterUtility(provided=IDexterityFTI, name=portal_type)).passthrough()
self.expect(site_manager_mock.unregisterUtility(provided=IFactory, name=portal_type)).passthrough()
self.replay()
# First add the components
ftiAdded(fti, ObjectAddedEvent(fti, container_dummy, fti.getId()))
# Then remove them again
ftiRemoved(fti, ObjectRemovedEvent(fti, container_dummy, fti.getId()))
site_dummy = self.create_dummy(getSiteManager=lambda: site_manager_mock)
setSite(site_dummy)
setHooks()
self.assertEquals(None, queryUtility(IDexterityFTI, name=portal_type))
self.assertEquals(None, queryUtility(IFactory, name=portal_type))
开发者ID:numahell,项目名称:plone.dexterity,代码行数:33,代码来源:test_fti.py
示例15: reindex
def reindex(self, batch=1000, skip=0):
""" find all contentish objects (meaning all objects derived from one
of the catalog mixin classes) and (re)indexes them """
requestFactory = queryUtility(IRequestFactory)
indexProcessor = queryUtility(IZeroCMSIndexQueueProcessor, name="zerocms")
zodb_conn = self.context._p_jar
log = self.mklog()
log('reindexing documents to ZeroCMS...\n')
if skip:
log('skipping indexing of %d object(s)...\n' % skip)
real = timer() # real time
lap = timer() # real lap time (for intermediate commits)
cpu = timer(clock) # cpu time
processed = 0
updates = {} # list to hold data to be updated
count = 0
for path, obj in findObjects(self.context):
if indexable(obj):
if getOwnIndexMethod(obj, 'indexObject') is not None:
log('skipping indexing of %r via private method.\n' % obj)
continue
count += 1
if count <= skip:
continue
indexProcessor.index(obj)
processed += 1
zodb_conn.cacheGC();
log('All documents exported to ZeroCMS.\n')
msg = 'processed %d items in %s (%s cpu time).'
msg = msg % (processed, real.next(), cpu.next())
log(msg)
logger.info(msg)
开发者ID:zeronorge,项目名称:Products.zerocms,代码行数:32,代码来源:maintenance.py
示例16: getObject
def getObject(self, REQUEST=None):
path = self.getPath().split('/')
if not path:
return None
parent = aq_parent(self)
if (aq_get(parent, 'REQUEST', None) is None
and _GLOBALREQUEST_INSTALLED and _REQUESTCONTAINER_EXISTS):
request = getRequest()
if request is not None:
# path should be absolute, starting at the physical root
parent = self.getPhysicalRoot()
request_container = RequestContainer(REQUEST=request)
parent = aq_base(parent).__of__(request_container)
if len(path) > 1:
try:
parent = parent.unrestrictedTraverse(path[:-1])
except:
if path[:-2] == 'data-'+self.portal_type:
parent = queryMultiAdapter((None, ICollectiveBehaviorSQLLayer), IBrowserView, name='data-'+name, default=None)
try:
return parent.restrictedTraverse(path[-1])
except:
connection = queryUtility(ISQLConnectionsUtility, name=self.portal_type, default=None)
if connection == None and self.portal_type:
fti = queryUtility(IDexterityFTI, name=self.portal_type, default=None)
if not fti:
return None
updateConnectionsForFti(fti)
connection = queryUtility(ISQLConnectionsUtility, name=self.portal_type, default=None)
return connection.getVirtualItem(self.sql_id, context=parent)
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:30,代码来源:monkeypatch.py
示例17: publishTraverse
def publishTraverse(self, request, name):
if not self.fti_id:
return super(SQLDexterityPublishTraverse, self).publishTraverse(request, name)
connection = queryUtility(ISQLConnectionsUtility, name=self.fti_id, default=None)
if connection == None and self.portal_type:
fti = queryUtility(IDexterityFTI, name=self.fti_id, default=None)
if not fti:
return None
updateConnectionsForFti(self.fti)
connection = getUtility(ISQLConnectionsUtility, name=self.fti_id)
name = name.split('/')[0]
name = name.split('++')[0]
name = name.split('@@')[0]
sql_id_column = self.fti.sql_id_column
factory_utility = queryUtility(IFactory, name=self.fti.factory)
catalog = getToolByName(getSite(), 'portal_catalog')
results = catalog.unrestrictedSearchResults(portal_type=self.fti_id, id=name)
if results:
sql_id = results[0].sql_id
else:
sql_id = name
try:
sql_items = connection.query(id=sql_id)
except:
sql_items = []
if sql_items:
sql_item = sql_items[0]
sql_item_id = getattr(sql_item, sql_id_column, False)
item = factory_utility(sql_id=sql_item_id)
item.sql_virtual = True
return item.__of__(self.context)
return super(SQLDexterityPublishTraverse, self).publishTraverse(request, name)
开发者ID:Martronic-SA,项目名称:collective.behavior.sql,代码行数:32,代码来源:content.py
示例18: revisionfiles
def revisionfiles(self, unrestricted=False):
""" All the revision files, ordered """
if unrestricted:
catalog = getMultiAdapter((self.context, self.context.REQUEST), name=u'plone_tools').catalog()
items = catalog.unrestrictedSearchResults(**{'object_provides':IRevisionFile.__identifier__,
'sort_order':'getId',
'path':'/'.join(self.context.getPhysicalPath())})
else:
items = self.context.getFolderContents({'object_provides':IRevisionFile.__identifier__,
'sort_order':'getId',
})
items = sorted(items, key=attrgetter('effective'), reverse=True)
if items:
portal_type = items[0].portal_type
priority_utility = queryUtility(IRevisionWorkflowUtility, name=portal_type)
priority_utility = not priority_utility and queryUtility(IRevisionWorkflowUtility)
if priority_utility:
priority_map = priority_utility.priority_map()
try:
items = sorted(items, key=lambda x: (priority_map.get(x.review_state, {}).get('priority'), x.effective))
except TypeError:
# Missing.value on items[0].review_state
pass
items.reverse()
if unrestricted:
return [item._unrestrictedGetObject() for item in items]
else:
return [item.getObject() for item in items]
开发者ID:redomino,项目名称:redomino.revision,代码行数:29,代码来源:revision.py
示例19: setupDefaultImages
def setupDefaultImages(site):
""" Move images from valentine-imagescales
"""
if ('valentine-imagescales' not in site.objectIds() or
IDepictionTool.providedBy(site['valentine-imagescales'])):
return setupGenericImage(site)
valentine = site['valentine-imagescales']
tool = queryUtility(IDepictionTool)
for image in valentine.objectIds():
if image not in tool.objectIds():
cb = valentine.manage_cutObjects(image)
tool.manage_pasteObjects(cb)
oldUrl = "/".join(valentine.getPhysicalPath())
site.manage_delObjects(['valentine-imagescales'])
# Add alias
storage = queryUtility(IRedirectionStorage)
storage.add(oldUrl, '/'.join(tool.getPhysicalPath()))
# Setup generic image
setupGenericImage(site)
开发者ID:collective,项目名称:eea.depiction,代码行数:26,代码来源:setuphandlers.py
示例20: get_plone_members
def get_plone_members(self):
""" return filtered list of plone members as DisplayList
"""
global fmp_tool
if fmp_tool and queryUtility(IFastmemberpropertiesTool, 'fastmemberproperties_tool'):
log.debug("Use fastmemberpropertiestool to get memberproperties!")
fmp_tool = queryUtility(IFastmemberpropertiesTool, 'fastmemberproperties_tool')
member_properties = fmp_tool.get_all_memberproperties()
else:
log.info("We use plone API to get memberproperties, this is very \
slow on many members, please install inqbus.plone.fastmemberproperties to make it fast!")
acl_userfolder = getToolByName(self, 'acl_users')
member_objs = acl_userfolder.getUsers()
member_properties = {}
for member in member_objs:
probdict = {}
probdict['id'] = member.getUserId()
probdict['email'] = member.getProperty('email')
probdict['fullname'] = safe_unicode(member.getProperty('fullname'))
member_properties[probdict['id']] = probdict
if not member_properties:
return []
try:
results = atapi.DisplayList([(id, property['fullname'] + ' - ' + property['email'])
for id, property in member_properties.items()
if config.EMAIL_RE.findall(property['email'])])
except TypeError, e:
log.error(":get_plone_members: error in member_properties %s/ \
properties:'%s'" % (e, member_properties.items()))
开发者ID:agnogueira,项目名称:Products.EasyNewsletter,代码行数:29,代码来源:EasyNewsletter.py
注:本文中的zope.component.queryUtility函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论