本文整理汇总了Python中pyramid.threadlocal.get_current_registry函数的典型用法代码示例。如果您正苦于以下问题:Python get_current_registry函数的具体用法?Python get_current_registry怎么用?Python get_current_registry使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_current_registry函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __delitem__
def __delitem__(self, item, flush=True):
"""Delete a value from the container using the key."""
if isinstance(item, string_types):
item = self[item]
if item.__parent_uri__ == self.__uri__:
if isinstance(item, BaseContainer):
for key in item.keys():
item.__delitem__(key, False)
get_current_registry().notify(
ptah.events.ContentDeletingEvent(item))
name = item.__name__
if self._v_keys:
self._v_keys.remove(name)
if self._v_items and name in self._v_items:
del self._v_items[name]
if item in Session:
try:
Session.delete(item)
if flush:
Session.flush()
except:
pass
return
raise KeyError(item)
开发者ID:runyaga,项目名称:ptah,代码行数:31,代码来源:container.py
示例2: get_searchable_text
def get_searchable_text(context, default):
root = find_root(context)
catalog = root.catalog
registry = get_current_registry()
discriminators = list(getattr(registry, 'searchable_text_discriminators', ()))
results = set()
registry = get_current_registry()
for index in registry.catalog_indexhelper['searchable_text'].linked:
if index not in catalog: #pragma: no coverage
# In case a bad name was linked in searchable_text, no reason to die because of it.
continue
disc = catalog[index].discriminator
if isinstance(disc, string_types):
attr_discriminator = _AttrDiscriminator(disc)
discriminators.append(attr_discriminator)
else:
discriminators.append(catalog[index].discriminator)
for discriminator in discriminators:
res = discriminator(context, default)
if res is default:
continue
if not isinstance(res, string_types):
res = str(res)
res = res.strip()
if res:
results.add(res)
text = " ".join(results)
text = text.strip()
return text and text or default
开发者ID:ArcheProject,项目名称:Arche,代码行数:29,代码来源:catalog.py
示例3: test_settings
def test_settings(self):
browser = self.login_testbrowser()
ctrl = browser.getControl
browser.getLink(u'Calendar').click()
ctrl('Title').value = u'Calendar'
ctrl(name=u'save').click()
for c in range(6):
browser.getLink(u'Calendar').click()
browser.getLink(u'Event').click()
ctrl('Title').value = u'Event %d' % c
ctrl('Start').value = u'2112-08-23 20:00:00'
ctrl(name=u'save').click()
browser.open(self.BASE_URL)
assert u"Event 5" not in browser.contents
settings = get_current_registry().settings
settings['kotti_calendar.upcoming_events_widget.events_count'] = u'nan'
browser.open(self.BASE_URL)
assert u"Event 5" not in browser.contents
settings = get_current_registry().settings
settings['kotti_calendar.upcoming_events_widget.events_count'] = u'7'
browser.open(self.BASE_URL)
assert u"Event 5" in browser.contents
开发者ID:ferewuz,项目名称:kotti_calendar,代码行数:26,代码来源:test_widgets.py
示例4: __delitem__
def __delitem__(self, item, flush=True):
"""Delete a value from the container using the key."""
if isinstance(item, string_types):
item = self[item]
if item.__parent_uri__ == self.__uri__:
if isinstance(item, BaseContainer):
for key in item.keys():
item.__delitem__(key, False)
get_current_registry().notify(
ptah.events.ContentDeletingEvent(item))
Session = ptah.get_session().object_session(item)
if item in Session:
try:
Session.delete(item)
if flush:
Session.flush()
except:
pass
return
raise KeyError(item)
开发者ID:djedproject,项目名称:ptahcms,代码行数:26,代码来源:container.py
示例5: update
def update(self):
result = {}
text_analyzer = get_current_registry().getUtility(
ITextAnalyzer,
'text_analyzer')
amendment_viewer = get_current_registry().getUtility(
IAmendmentViewer,
'amendment_viewer')
souptextdiff, explanations = amendment_viewer.get_explanation_diff(
self.context, self.request)
amendment_viewer.add_details(explanations,
self.context,
self.request,
souptextdiff,
self.readonly_explanation_template)
text_diff = text_analyzer.soup_to_text(souptextdiff)
not_published_ideas = [i for i in self.context.get_used_ideas() \
if not('published' in i.state)]
values = {'context': self.context,
'explanationtext': text_diff,
'ideas': not_published_ideas}
body = self.content(result=values, template=self.template)['body']
item = self.adapt_item(body, self.viewid)
result['coordinates'] = {self.coordinates:[item]}
return result
开发者ID:jean,项目名称:nova-ideo,代码行数:25,代码来源:submit.py
示例6: __acl__
def __acl__(self):
acl = self.admins() + self.viewers() + self.editors() + [DENY_ALL]
get_current_registry().notify(ACLRequest(acl, self.context))
return acl
开发者ID:wyldebeast-wunderliebe,项目名称:w20e.pycms,代码行数:7,代码来源:security.py
示例7: notify_role_for_acceptance
def notify_role_for_acceptance(user_id, requester, model):
"""Notify the given ``user_id`` on ``model`` that s/he has been
assigned a role on said model and can now accept the role.
"""
accounts = get_current_registry().getUtility(IOpenstaxAccounts)
settings = get_current_registry().settings
base_url = settings['webview.url']
link = urlparse.urljoin(base_url, '/users/role-acceptance/{}'
.format(model.id))
subject = 'Requesting action on OpenStax CNX content'
body = '''\
Hello {name},
{requester} added you to content titled {title}.
Please go to the following link to accept your roles and license:
{link}
Thank you from your friends at OpenStax CNX
'''.format(name=user_id,
requester=requester,
title=model.metadata['title'],
link=link)
try:
accounts.send_message(user_id, subject, body)
except urllib2.HTTPError:
# Can't send messages via accounts for some reason - should be async!
logger.warning("Failed sending notification message to {}"
.format(user_id))
pass
开发者ID:Connexions,项目名称:cnx-authoring,代码行数:30,代码来源:utils.py
示例8: test_settings
def test_settings(webtest, root):
from kotti_calendar.resources import Calendar
from kotti_calendar.resources import Event
root['calendar'] = calendar = Calendar(title=u'Calendar')
for c in range(6):
calendar['event-%d' % c] = Event(
title=u'Event %d' % c,
start=datetime(2112, 8, 23, c, 0, 0))
transaction.commit()
resp = webtest.get('/')
assert u"Event 5" not in resp.body
settings = get_current_registry().settings
settings['kotti_calendar.upcoming_events_widget.events_count'] = u'nan'
resp = webtest.get('/')
assert u"Event 5" not in resp.body
settings = get_current_registry().settings
settings['kotti_calendar.upcoming_events_widget.events_count'] = u'7'
resp = webtest.get('/')
assert u"Event 5" in resp.body
开发者ID:Kotti,项目名称:kotti_calendar,代码行数:26,代码来源:test_widgets.py
示例9: get_text_amendment_diff_submitted
def get_text_amendment_diff_submitted(amendment, request):
text_analyzer = get_current_registry().getUtility(
ITextAnalyzer, 'text_analyzer')
amendment_viewer = get_current_registry().getUtility(
IAmendmentViewer, 'amendment_viewer')
souptextdiff, explanations = amendment_viewer.get_explanation_diff(
amendment, request)
amendment_viewer.add_details(explanations, amendment, request, souptextdiff)
return explanations, text_analyzer.soup_to_text(souptextdiff)
开发者ID:jean,项目名称:nova-ideo,代码行数:9,代码来源:behaviors.py
示例10: get_settings
def get_settings():
from kotti.resources import Settings
session = DBSession()
db_settings = session.query(Settings).order_by(desc(Settings.id)).first()
if db_settings is not None:
reg_settings = dict(get_current_registry().settings)
reg_settings.update(db_settings.data)
return reg_settings
else:
return get_current_registry().settings
开发者ID:mr-krille,项目名称:Kotti,代码行数:10,代码来源:__init__.py
示例11: update
def update(self, **data):
if self.__type__:
tinfo = self.__type__
for field in tinfo.fieldset.fields():
val = data.get(field.name, field.default)
if val is not form.null:
setattr(self, field.name, val)
get_current_registry().notify(
ptah.events.ContentModifiedEvent(self))
开发者ID:djedproject,项目名称:ptahcms,代码行数:11,代码来源:content.py
示例12: uncheckedDatas_graph
def uncheckedDatas_graph(request):
session = request.dbsession
viewArgos = Base.metadata.tables["VArgosData_With_EquipIndiv"]
queryArgos = (
select([viewArgos.c["type"].label("type"), func.count("*").label("nb")])
.where(viewArgos.c["checked"] == 0)
.group_by(viewArgos.c["type"])
)
viewGSM = Base.metadata.tables["VGSMData_With_EquipIndiv"]
queryGSM = select([func.count("*").label("nb")]).where(viewGSM.c["checked"] == 0)
queryRFID = select([func.count("*").label("nb")]).where(Rfid.checked == 0)
data = []
session1 = threadlocal.get_current_registry().dbmaker()
session2 = threadlocal.get_current_registry().dbmaker()
session3 = threadlocal.get_current_registry().dbmaker()
global graphDataDate
global pendingSensorData
d = datetime.datetime.now() - datetime.timedelta(days=1)
if graphDataDate["pendingSensorData"] is None or graphDataDate["pendingSensorData"] < d:
graphDataDate["pendingSensorData"] = datetime.datetime.now()
argosData = session1.execute(queryArgos).fetchall()
for row in argosData:
curRow = OrderedDict(row)
lab = curRow["type"].upper()
if lab == "ARG":
lab = "ARGOS"
data.append({"value": curRow["nb"], "label": lab})
for row in session2.execute(queryGSM).fetchall():
curRow = OrderedDict(row)
data.append({"value": curRow["nb"], "label": "GSM"})
for row in session3.execute(queryRFID).fetchall():
curRow = OrderedDict(row)
data.append({"value": curRow["nb"], "label": "RFID"})
data.sort(key=itemgetter("label"))
pendingSensorData = data
else:
print("unchecked data already fetched")
session1.close()
session2.close()
session3.close()
return pendingSensorData
开发者ID:gerald13,项目名称:ecoReleve-Data,代码行数:53,代码来源:statistics.py
示例13: after_request
def after_request(response):
if flask.request.method == 'OPTIONS':
return response
if 200 <= response.status_code < 300:
match = re.match(r'^store\.(\w+)_annotation$', flask.request.endpoint)
if match:
action = match.group(1)
if action != 'delete':
annotation = json.loads(response.data)
event = events.AnnotatorStoreEvent(annotation, action)
get_current_registry().notify(event)
return response
开发者ID:abigailricarte,项目名称:h,代码行数:13,代码来源:store.py
示例14: uncheckedDatas_graph
def uncheckedDatas_graph(request):
session = request.dbsession
viewArgos = Base.metadata.tables['VArgosData_With_EquipIndiv']
queryArgos= select([viewArgos.c['type'].label('type'),func.count('*').label('nb')]
).where(viewArgos.c['checked'] == 0
).group_by(viewArgos.c['type'])
viewGSM = Base.metadata.tables['VGSMData_With_EquipIndiv']
queryGSM= select([func.count('*').label('nb')]
).where(viewGSM.c['checked'] == 0)
queryRFID = select([func.count('*').label('nb')]
).where(Rfid.checked == 0)
data = []
session1 = threadlocal.get_current_registry().dbmaker()
session2 = threadlocal.get_current_registry().dbmaker()
session3 = threadlocal.get_current_registry().dbmaker()
global graphDataDate
global pendingSensorData
d = datetime.datetime.now() - datetime.timedelta(days=1)
if graphDataDate['pendingSensorData'] is None or graphDataDate['pendingSensorData'] < d :
graphDataDate['pendingSensorData'] = datetime.datetime.now()
argosData = session1.execute(queryArgos).fetchall()
for row in argosData:
curRow = OrderedDict(row)
lab = curRow['type'].upper()
if lab == 'ARG':
lab = 'ARGOS'
data.append({'value':curRow['nb'],'label':lab})
for row in session2.execute(queryGSM).fetchall() :
curRow = OrderedDict(row)
data.append({'value':curRow['nb'],'label':'GSM'})
for row in session3.execute(queryRFID).fetchall() :
curRow = OrderedDict(row)
data.append({'value':curRow['nb'],'label':'RFID'})
data.sort(key = itemgetter('label'))
pendingSensorData = data
session1.close()
session2.close()
session3.close()
return pendingSensorData
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:51,代码来源:statistics.py
示例15: __setitem__
def __setitem__(self, key, item):
"""Set a new item in the container."""
if not isinstance(item, BaseContent):
raise ValueError("Content object is required")
if item.__uri__ == self.__uri__:
raise ValueError("Can't set to it self")
parents = [p.__uri__ for p in load_parents(self)]
if item.__uri__ in parents:
raise TypeError("Can't itself to chidlren")
if key in self.keys():
raise KeyError(key)
if item.__parent_uri__ is None:
event = ptah.events.ContentAddedEvent(item)
else:
event = ptah.events.ContentMovedEvent(item)
item.__name__ = key
item.__parent__ = self
item.__parent_uri__ = self.__uri__
item.__path__ = '%s%s/'%(self.__path__, key)
if item not in Session:
Session.add(item)
# temporary keys
if not self._v_items:
self._v_items = {key: item}
else:
self._v_items[key] = item
if key not in self._v_keys:
self._v_keys.append(key)
# recursevly update children paths
def update_path(container):
path = container.__path__
for item in container.values():
item.__path__ = '%s%s/'%(path, item.__name__)
if isinstance(item, BaseContainer):
update_path(item)
if isinstance(item, BaseContainer):
update_path(item)
get_current_registry().notify(event)
开发者ID:runyaga,项目名称:ptah,代码行数:51,代码来源:container.py
示例16: unit
def unit(request):
response = BaseFixture()
response.setUp()
response.request = testing.DummyRequest()
response.config = testing.setUp(request=response.request)
get_current_registry().settings[CONFIG_RESOURCES] = models
get_current_registry().settings['sqlalchemy.url'] =\
TEST_DATABASE_CONNECTION_STRING
def fin():
response.tearDown()
testing.tearDown()
request.addfinalizer(fin)
return response
开发者ID:drnextgis,项目名称:ps_alchemy,代码行数:15,代码来源:conftest.py
示例17: versioned
def versioned(path):
version = get_current_registry().settings['app_version']
entry_path = get_current_registry().settings['entry_path'] + '/'
if version is not None:
agnosticPath = make_agnostic(path)
parsedURL = urlparse(agnosticPath)
# we don't do version when behind pserve (at localhost)
if 'localhost:' not in parsedURL.netloc:
parts = parsedURL.path.split(entry_path, 1)
if len(parts) > 1:
parsedURL = parsedURL._replace(path=parts[0] + entry_path + version + '/' + parts[1])
agnosticPath = urlunparse(parsedURL)
return agnosticPath
else:
return path
开发者ID:justb4,项目名称:mf-chsdi3,代码行数:15,代码来源:helpers.py
示例18: service_entry
def service_entry(self):
"""Implement this as a property to have the context when looking for
the value of the setting"""
if self._service_entry is None:
settings = get_current_registry().settings
self._service_entry = settings.get('tokenserver.service_entry')
return self._service_entry
开发者ID:edmoz,项目名称:tokenserver,代码行数:7,代码来源:memorynode.py
示例19: get_text_amendment_diff
def get_text_amendment_diff(proposal, amendment):
text_analyzer = get_current_registry().getUtility(
ITextAnalyzer,'text_analyzer')
soup, textdiff = text_analyzer.render_html_diff(
getattr(proposal, 'text', ''),
getattr(amendment, 'text', ''))
return textdiff
开发者ID:jean,项目名称:nova-ideo,代码行数:7,代码来源:behaviors.py
示例20: initialize_sql
def initialize_sql(engine, drop_all=False):
DBSession.registry.clear()
DBSession.configure(bind=engine)
metadata.bind = engine
if drop_all or os.environ.get('KOTTI_TEST_DB_STRING'):
metadata.reflect()
metadata.drop_all(engine)
# Allow users of Kotti to cherry pick the tables that they want to use:
settings = get_current_registry().settings
tables = settings['kotti.use_tables'].strip() or None
if tables:
if 'settings' not in tables:
tables += ' settings'
tables = [metadata.tables[name] for name in tables.split()]
if engine.dialect.name == 'mysql': # pragma: no cover
from sqlalchemy.dialects.mysql.base import LONGBLOB
File.__table__.c.data.type = LONGBLOB()
metadata.create_all(engine, tables=tables)
for populate in get_settings()['kotti.populators']:
populate()
commit()
return DBSession()
开发者ID:fschulze,项目名称:Kotti,代码行数:27,代码来源:resources.py
注:本文中的pyramid.threadlocal.get_current_registry函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论