本文整理汇总了Python中zope.event.notify函数的典型用法代码示例。如果您正苦于以下问题:Python notify函数的具体用法?Python notify怎么用?Python notify使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了notify函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: createAndAdd
def createAndAdd(self, data):
domain_model = self.domain_model
# create the object, inspect data for constructor args
try:
ob = createInstance(domain_model, data)
except TypeError:
log.error("Failure: createInstance(%s, %s)", domain_model, data)
probing.log_exc(sys.exc_info(), log_handler=log.error)
ob = domain_model()
# apply any context values
self.finishConstruction(ob)
# apply extra form values
formlib.form.applyChanges(ob, self.form_fields, data, self.adapters)
# set the object in container context, causing autosetting of
# constrained values e.g. one2many attributes, by triggering call to
# _ManagedContainer.constraints.setConstrainedValues()
self.context[""] = ob
# flush so we have database id
Session().flush()
# !+DataError reload form and display this error?
# fire an object created event
notify(ObjectCreatedEvent(ob)) # !+ would set doc_id (if session not flushed) !!
# signal to add form machinery to go to next url
self._finished_add = True
# retrieve the object with location and security information
oid = self.get_oid(ob)
return self.context[oid]
开发者ID:BenoitTalbot,项目名称:bungeni-portal,代码行数:27,代码来源:ui.py
示例2: _do_save
def _do_save(self, data):
formlib.form.applyChanges(self.context, self.form_fields, data)
# !+EVENT_DRIVEN_CACHE_INVALIDATION(mr, mar-2011) no modify event
# invalidate caches for this domain object type
notify(ObjectModifiedEvent(self.context))
#cascade_modifications(self.context)
invalidate_caches_for(self.context.__class__.__name__, "edit")
开发者ID:BenoitTalbot,项目名称:bungeni-portal,代码行数:7,代码来源:common.py
示例3: __call__
def __call__(self, name, content_type, data):
ctr = getToolByName(self.context, 'content_type_registry')
type_ = ctr.findTypeName(name.lower(), '', '') or 'File'
# XXX: quick fix for german umlauts
name = name.decode("utf8")
normalizer = getUtility(IFileNameNormalizer)
chooser = INameChooser(self.context)
# otherwise I get ZPublisher.Conflict ConflictErrors
# when uploading multiple files
upload_lock.acquire()
# this should fix #8
newid = chooser.chooseName(normalizer.normalize(name),
self.context.aq_parent)
try:
transaction.begin()
obj = ploneutils._createObjectByType(type_,
self.context, newid)
mutator = obj.getPrimaryField().getMutator(obj)
mutator(data, content_type=content_type)
obj.setTitle(name)
obj.reindexObject()
notify(ObjectInitializedEvent(obj))
notify(ObjectModifiedEvent(obj))
transaction.commit()
finally:
upload_lock.release()
return obj
开发者ID:cillianderoiste,项目名称:plone.app.widgets,代码行数:33,代码来源:factories.py
示例4: test_document_fusion
def test_document_fusion(self):
# data source and model are in the same content
alsoProvides(self.portal.REQUEST, ICollectiveDocumentfusionLayer)
content = api.content.create(self.portal, type='letter',
title=u"En réponse...",
file=NamedFile(data=open(TEST_LETTER_ODT).read(),
filename=u'letter.odt',
contentType='application/vnd.oasis.opendocument.text'),
sender_name="Thomas Desvenain",
sender_address="57 Quai du Pré Long",
recipient_name="Vincent Fretin",
date=datetime.date(2012, 12, 23))
notify(ObjectModifiedEvent(content))
generated_stream = content.unrestrictedTraverse('@@getdocumentfusion')()
self.assertTrue(generated_stream)
self.assertEqual(self.portal.REQUEST.response['content-type'],
'application/pdf')
generated_path = tempfile.mktemp(suffix='letter.pdf')
generated_file = open(generated_path, 'w')
generated_file.write(generated_stream.read())
generated_file.close()
txt_path = tempfile.mktemp(suffix='letter.pdf')
subprocess.call(['pdftotext', generated_path, txt_path])
txt = open(txt_path).read()
self.assertIn('Vincent Fretin', txt)
self.assertIn('57 Quai du Pré Long', txt)
self.assertIn('2012', txt)
self.assertIn(u'EN RÉPONSE...', txt)
os.remove(txt_path)
os.remove(generated_path)
开发者ID:cedricmessiant,项目名称:collective.documentfusion,代码行数:33,代码来源:test_setup.py
示例5: retract
def retract(self, principal=None):
if principal is None:
principal = getPrincipal()
if not self.isRetractable(principal):
raise DraftException('Cannot retract content.')
container = queryMultiAdapter((principal, self), IDraftContainer)
if container is None:
raise DraftException('Cannot find draft container.')
content = self.context
origName = content.__name__
oldContainer = content.__parent__
newName = INameChooser(container).chooseName(u'', content)
container[newName] = removeAllProxies(content)
del removeAllProxies(oldContainer)[origName]
draft = container[newName]
event.notify(ObjectRetractedEvent(content, draft))
return draft
开发者ID:Zojax,项目名称:zojax.content.draft,代码行数:25,代码来源:contenttype.py
示例6: test_checked_out_docs_arent_listed_twice
def test_checked_out_docs_arent_listed_twice(self, browser):
self.login(self.regular_user, browser=browser)
self._clear_recently_touched_log(self.regular_user.getId())
with freeze(datetime(2018, 4, 30)):
manager = queryMultiAdapter(
(self.document, self.request), ICheckinCheckoutManager)
manager.checkout()
notify(ObjectTouchedEvent(self.document))
url = '%s/@recently-touched/%s' % (
self.portal.absolute_url(), self.regular_user.getId())
browser.open(url, method='GET', headers={'Accept': 'application/json'})
# If a document is both in the log for recently touched objects as
# well as checked out, it must only be listed once, in the
# checked out documents section.
self.assertEqual(200, browser.status_code)
self.assertEquals(
{'checked_out': [{
'icon_class': 'icon-docx is-checked-out-by-current-user',
'last_touched': '2018-04-30T00:00:00+02:00',
'target_url': self.document.absolute_url(),
'title': u'Vertr\xe4gsentwurf'}],
'recently_touched': []},
browser.json)
开发者ID:4teamwork,项目名称:opengever.core,代码行数:28,代码来源:test_recently_touched.py
示例7: testOnItemCreation
def testOnItemCreation(self):
"""Test notification on item creation."""
portal = self.portal
ntool = getToolByName(portal, NTOOL_ID)
changeProperty = lambda key, value: \
ntool.manage_changeProperties(**{key: value})
wtool = getToolByName(portal, 'portal_workflow')
mh = portal.MailHost
self.login('manager')
## Set correct rules so that 3 mails should be sent.
changeProperty('item_creation_notification_enabled', True)
changeProperty('on_item_creation_users', ['* :: *'])
changeProperty('on_item_creation_mail_template',
['* :: string:creation_mail_notification'])
portal.invokeFactory('Document', 'document')
## See 'events/events.txt' for futher details about this
## manually fired event.
event.notify(ObjectInitializedEvent(portal['document']))
self.failUnlessSent(1)
portal.manage_delObjects(['document'])
mh.clearSentList()
## Set workflow initial state to 'publish', thus showing the
## new item to every users.
wtool.simple_publication_workflow.initial_state = 'published'
portal.invokeFactory('Document', 'document')
event.notify(ObjectInitializedEvent(portal['document']))
self.failUnlessSent(3)
portal.manage_delObjects(['document'])
mh.clearSentList()
## Disable notification
changeProperty('item_creation_notification_enabled', False)
portal.invokeFactory('Document', 'document')
event.notify(ObjectInitializedEvent(portal['document']))
self.failUnlessSent(0)
portal.manage_delObjects(['document'])
mh.clearSentList()
## Enable notification but set the notified users list to []
changeProperty('item_creation_notification_enabled', True)
ntool.manage_changeProperties(on_item_creation_users='* :: python: []')
portal.invokeFactory('Document', 'document')
event.notify(ObjectInitializedEvent(portal['document']))
self.failUnlessSent(0)
portal.manage_delObjects(['document'])
mh.clearSentList()
## Set the notified users list to "everybody" but ask for a
## missing mail template
changeProperty('on_item_creation_users', ['* :: *'])
changeProperty('on_item_creation_mail_template',
['* :: string:does_not_exist'])
portal.invokeFactory('Document', 'document')
event.notify(ObjectInitializedEvent(portal['document']))
self.failUnlessSent(0)
portal.manage_delObjects(['document'])
mh.clearSentList()
开发者ID:collective,项目名称:Products.CMFNotification,代码行数:60,代码来源:testNotification.py
示例8: _importNode
def _importNode(self, node):
"""Import the object from the DOM node.
"""
for child in node.childNodes:
# Properties
if child.nodeName == 'property':
name = child.getAttribute('name')
purge = child.getAttribute('purge')
purge = self._convertToBoolean(purge)
elements = []
field = self.context.getField(name)
for element in child.childNodes:
if element.nodeName != 'element':
continue
elements.append(element.getAttribute('value'))
if elements:
if not purge:
value = elements
oldValue = field.getAccessor(self.context)()
value.extend(x for x in oldValue if x not in value)
else:
value = []
else:
value = self._getNodeText(child)
value = value.decode('utf-8')
value = value if not purge else u''
field.getMutator(self.context)(value)
notify(ObjectModifiedEvent(self.context))
self.context.reindexObject()
开发者ID:collective,项目名称:eea.pdf,代码行数:33,代码来源:theme.py
示例9: applyChanges
def applyChanges(self, data):
changes = self.schema.setSchemaData(data)
if changes:
event.notify(ObjectModifiedEvent(self.context, Attributes(IContentSchema, *changes)))
return {IContentSchema: changes}
else:
return {}
开发者ID:Zojax,项目名称:zojax.content.schema,代码行数:7,代码来源:edit.py
示例10: fix_relations
def fix_relations():
relations_catalog = getUtility(ICatalog)
intids = getUtility(IIntIds)
relations = list(relations_catalog.findRelations())
for relation in relations:
from_object = intids.getObject(relation.from_id)
from_attribute = relation.from_attribute
to_id = relation.to_id
attr = getattr(from_object, from_attribute, None)
attr_is_list = isinstance(attr, list)
# remove the broken relation
if attr_is_list:
setattr(from_object, from_attribute,
[x for x in attr if x is not relation])
else:
setattr(from_object, from_attribute, None)
# let the catalog remove the old relation
notify(ObjectModifiedEvent(from_object))
attr = getattr(from_object, from_attribute, None)
# create a new relation
new_relation = RelationValue(to_id)
if attr_is_list:
attr.append(new_relation)
else:
setattr(from_object, from_attribute, new_relation)
# let the catalog know about this new relation
notify(ObjectModifiedEvent(from_object))
开发者ID:ixds,项目名称:plone-virtualcenter,代码行数:35,代码来源:helpers.py
示例11: _create_file
def _create_file(self, item, files, title, description, rights):
namechooser = INameChooser(self.context)
content_type = item.headers.get('Content-Type')
filename = safe_unicode(item.filename)
data = item.read()
id_name = ''
title = title and title[0] or filename
id_name = namechooser.chooseName(title, self.context)
if content_type in IMAGE_MIMETYPES:
portal_type = 'Image'
wrapped_data = NamedBlobImage(data=data, filename=filename)
else:
portal_type = 'File'
wrapped_data = NamedBlobFile(data=data, filename=filename)
self.context.invokeFactory(portal_type,
id=id_name,
title=title,
description=description[0],
rights=rights[0])
newfile = self.context[id_name]
if portal_type == 'File':
if IATFile.providedBy(newfile):
newfile.setFile(data, filename=filename)
else:
newfile.file = wrapped_data
elif portal_type == 'Image':
if IATImage.providedBy(newfile):
newfile.setImage(data, filename=filename)
else:
newfile.image = wrapped_data
newfile.reindexObject()
notify(ObjectModifiedEvent(newfile))
return newfile
开发者ID:dgti-iff,项目名称:brasil.gov.portal,代码行数:35,代码来源:browser.py
示例12: setUp
def setUp(self):
""" """
portal = makerequest(self.layer['portal'])
self.request = portal.REQUEST
alsoProvides(self.request, IPloneintranetDocconvClientLayer)
setRoles(portal, TEST_USER_ID, ('Manager',))
gsettings = GlobalSettings(portal)
self.storage_dir = mkdtemp()
gsettings.storage_location = self.storage_dir
# temporarily disable event handler so that we can test objects without
# previews
from ploneintranet.docconv.client import handlers
_update_preview_images = handlers._update_preview_images
handlers._update_preview_images = lambda obj, event: None
self.workspace = api.content.create(
type='Folder',
title=u"Docconv Workspace",
container=portal)
ff = open(os.path.join(os.path.dirname(__file__), TEST_FILENAME), 'r')
self.filedata = ff.read()
ff.close()
self.testfile = api.content.create(
type='File',
id='test-file',
title=u"Test File",
file=NamedBlobFile(data=self.filedata, filename=TEST_FILENAME),
container=self.workspace)
handlers._update_preview_images = _update_preview_images
event.notify(BeforeTraverseEvent(portal, portal.REQUEST))
开发者ID:cedricmessiant,项目名称:ploneintranet,代码行数:34,代码来源:test_docconv.py
示例13: moveProcess
def moveProcess(self, uid, targetUid):
obj = self._getObject(uid)
target = self._getObject(targetUid)
brainsCollection = []
# reindex all the devices and processes underneath this guy and the target
for org in (obj.getPrimaryParent().getPrimaryParent(), target):
catalog = ICatalogTool(org)
brainsCollection.append(catalog.search(OSProcess))
if isinstance(obj, OSProcessClass):
source = obj.osProcessOrganizer()
source.moveOSProcessClasses(targetUid, obj.id)
newObj = getattr(target.osProcessClasses, obj.id)
elif isinstance(obj, OSProcessOrganizer):
source = aq_parent(obj)
source.moveOrganizer(targetUid, (obj.id,))
newObj = getattr(target, obj.id)
else:
raise Exception('Illegal type %s' % obj.__class__.__name__)
# fire the object moved event for the process instances (will update catalog)
for brains in brainsCollection:
objs = imap(unbrain, brains)
for item in objs:
notify(ObjectMovedEvent(item, item.os(), item.id, item.os(), item.id))
return newObj.getPrimaryPath()
开发者ID:SteelHouseLabs,项目名称:zenoss-prodbin,代码行数:29,代码来源:processfacade.py
示例14: createResponse
def createResponse(self, issue, text='Response text', issueTransition='',
newSeverity=None, newTargetRelease=None,
newResponsibleManager=None, attachment=None):
"""Create a response to the given tracker, and perform workflow and
rename-after-creation initialisation"""
from Products.Poi.browser.response import Create
request = issue.REQUEST
request.form['response'] = text
request.form['transition'] = issueTransition
if newSeverity is not None:
request.form['severity'] = newSeverity
if newTargetRelease is not None:
request.form['targetRelease'] = newTargetRelease
if newResponsibleManager is not None:
request.form['responsibleManager'] = newResponsibleManager
if attachment is not None:
request.form['attachment'] = attachment
create_view = Create(issue, request)
# A response is created by calling this view:
create_view()
container = IResponseContainer(issue)
id = str(len(container) - 1)
response = container[id]
# In tests we need to fire this event manually:
notify(ObjectModifiedEvent(response))
return response
开发者ID:RedTurtle,项目名称:Products.Poi,代码行数:28,代码来源:ptc.py
示例15: __call__
def __call__(self):
if not isCachePurgingEnabled():
return 'Caching not enabled'
notify(Purge(self.context))
return 'Queued'
开发者ID:dokai,项目名称:plone.cachepurging,代码行数:7,代码来源:browser.py
示例16: test_limits_recently_touched_items
def test_limits_recently_touched_items(self, browser):
self.login(self.regular_user, browser=browser)
user_id = self.regular_user.getId()
self._clear_recently_touched_log(user_id)
# Touch a couple documents (more than the current limit)
docs = [self.document, self.private_document, self.expired_document,
self.subdocument, self.taskdocument]
with freeze(datetime(2018, 4, 30)) as freezer:
for doc in docs:
freezer.forward(minutes=1)
notify(ObjectTouchedEvent(doc))
api.portal.set_registry_record(
'limit', 3, IRecentlyTouchedSettings)
url = '%s/@recently-touched/%s' % (
self.portal.absolute_url(), self.regular_user.getId())
browser.open(
url, method='GET', headers={'Accept': 'application/json'})
# Even though the storage contains more logged touched entries, the
# API endpoint should truncate them to the currently defined limit.
self.assertEqual(200, browser.status_code)
recently_touched_list = browser.json['recently_touched']
self.assertEqual(3, len(recently_touched_list))
开发者ID:4teamwork,项目名称:opengever.core,代码行数:29,代码来源:test_recently_touched.py
示例17: test_enabled
def test_enabled(self):
context = FauxContext()
request = FauxRequest()
alsoProvides(request, IAttributeAnnotatable)
setRequest(request)
configlet= CachePurgingConfiglet()
provideUtility(configlet, ICachePurgingConfiglet)
settings = getUtility(ICachePurgingConfiglet)
settings.enabled = True
settings.cachingProxies = ('http://localhost:1234',)
class FauxPurgePaths(object):
implements(IPurgePaths)
adapts(FauxContext)
def __init__(self, context):
self.context = context
def getRelativePaths(self):
return ['/foo', '/bar']
def getAbsolutePaths(self):
return []
provideAdapter(FauxPurgePaths, name="test1")
notify(Purge(context))
self.assertEquals({'zojax.cachepurging.urls': set(['/foo', '/bar'])},
dict(IAnnotations(request)))
开发者ID:Zojax,项目名称:zojax.cachepurging,代码行数:33,代码来源:__test_hooks.py
示例18: handleSave
def handleSave(self, action):
data, errors = self.extractData()
if errors:
self.status = self.formErrorsMessage
return
typeName = self.tileType.__name__
# Traverse to a new tile in the context, with no data
tile = self.context.restrictedTraverse('@@%s/%s' % (typeName, self.tileId,))
dataManager = ITileDataManager(tile)
# We need to check first for existing content in order to not loose
# fields that weren't sent with the form.
old_data = dataManager.get()
for item in data:
if data[item] is not None:
old_data[item] = data[item]
dataManager.set(old_data)
# Look up the URL - we need to do this after we've set the data to
# correctly account for transient tiles
tileURL = absoluteURL(tile, self.request)
notify(ObjectModifiedEvent(tile))
# Get the tile URL, possibly with encoded data
IStatusMessage(self.request).addStatusMessage(_(u"Tile saved",), type=u'info')
self.request.response.redirect(tileURL)
开发者ID:digitalnatif,项目名称:collective.cover,代码行数:30,代码来源:edit.py
示例19: test_request_not_annotatable
def test_request_not_annotatable(self):
request = FauxRequest()
configlet= CachePurgingConfiglet()
provideUtility(configlet, ICachePurgingConfiglet)
settings = getUtility(ICachePurgingConfiglet)
settings.enabled = True
settings.cachingProxies = ('http://localhost:1234',)
class FauxPurger(object):
implements(IPurger)
def __init__(self):
self.purged = []
def purgeAsync(self, url, httpVerb='PURGE'):
self.purged.append(url)
purger = FauxPurger()
provideUtility(purger)
notify(EndRequestEvent(None, request))
self.assertEquals([], purger.purged)
开发者ID:Zojax,项目名称:zojax.cachepurging,代码行数:25,代码来源:__test_hooks.py
示例20: create_application
def create_application(factory, container, name):
"""Creates an application and triggers the events from
the application lifecycle.
"""
# Check the factory.
assert IApplication.implementedBy(factory)
# Check the availability of the name in the container.
if name in container:
raise KeyError(name)
# Instanciate the application
application = factory()
# Trigger the creation event.
notify(ObjectCreatedEvent(application))
# Persist the application.
# This may raise a KeyError.
container[name] = application
# Trigger the initialization event.
notify(ApplicationInitializedEvent(application))
return application
开发者ID:CGTIC,项目名称:Plone_SP,代码行数:25,代码来源:util.py
注:本文中的zope.event.notify函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论