• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python threadlocal.get_current_registry函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pyramid.threadlocal.get_current_registry函数的典型用法代码示例。如果您正苦于以下问题:Python get_current_registry函数的具体用法?Python get_current_registry怎么用?Python get_current_registry使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_current_registry函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __delitem__

    def __delitem__(self, item, flush=True):
        """Delete a value from the container using the key."""

        if isinstance(item, string_types):
            item = self[item]

        if item.__parent_uri__ == self.__uri__:
            if isinstance(item, BaseContainer):
                for key in item.keys():
                    item.__delitem__(key, False)

            get_current_registry().notify(
                ptah.events.ContentDeletingEvent(item))

            name = item.__name__
            if self._v_keys:
                self._v_keys.remove(name)
            if self._v_items and name in self._v_items:
                del self._v_items[name]

            if item in Session:
                try:
                    Session.delete(item)
                    if flush:
                        Session.flush()
                except:
                    pass

            return

        raise KeyError(item)
开发者ID:runyaga,项目名称:ptah,代码行数:31,代码来源:container.py


示例2: get_searchable_text

def get_searchable_text(context, default):
    root = find_root(context)
    catalog = root.catalog
    registry = get_current_registry()
    discriminators = list(getattr(registry, 'searchable_text_discriminators', ()))
    results = set()
    registry = get_current_registry()
    for index in registry.catalog_indexhelper['searchable_text'].linked:
        if index not in catalog: #pragma: no coverage
            # In case a bad name was linked in searchable_text, no reason to die because of it.
            continue
        disc = catalog[index].discriminator
        if isinstance(disc, string_types):
            attr_discriminator = _AttrDiscriminator(disc)
            discriminators.append(attr_discriminator)
        else:
            discriminators.append(catalog[index].discriminator)
    for discriminator in discriminators:
        res = discriminator(context, default)
        if res is default:
            continue
        if not isinstance(res, string_types):
            res = str(res)
        res = res.strip()
        if res:
            results.add(res)
    text = " ".join(results)
    text = text.strip()
    return text and text or default
开发者ID:ArcheProject,项目名称:Arche,代码行数:29,代码来源:catalog.py


示例3: test_settings

    def test_settings(self):
        browser = self.login_testbrowser()
        ctrl = browser.getControl
        browser.getLink(u'Calendar').click()
        ctrl('Title').value = u'Calendar'
        ctrl(name=u'save').click()

        for c in range(6):
            browser.getLink(u'Calendar').click()
            browser.getLink(u'Event').click()
            ctrl('Title').value = u'Event %d' % c
            ctrl('Start').value = u'2112-08-23 20:00:00'
            ctrl(name=u'save').click()

        browser.open(self.BASE_URL)
        assert u"Event 5" not in browser.contents

        settings = get_current_registry().settings
        settings['kotti_calendar.upcoming_events_widget.events_count'] = u'nan'
        browser.open(self.BASE_URL)
        assert u"Event 5" not in browser.contents

        settings = get_current_registry().settings
        settings['kotti_calendar.upcoming_events_widget.events_count'] = u'7'
        browser.open(self.BASE_URL)
        assert u"Event 5" in browser.contents
开发者ID:ferewuz,项目名称:kotti_calendar,代码行数:26,代码来源:test_widgets.py


示例4: __delitem__

    def __delitem__(self, item, flush=True):
        """Delete a value from the container using the key."""

        if isinstance(item, string_types):
            item = self[item]

        if item.__parent_uri__ == self.__uri__:
            if isinstance(item, BaseContainer):
                for key in item.keys():
                    item.__delitem__(key, False)

            get_current_registry().notify(
                ptah.events.ContentDeletingEvent(item))

            Session = ptah.get_session().object_session(item)
            if item in Session:
                try:
                    Session.delete(item)
                    if flush:
                        Session.flush()
                except:
                    pass

            return

        raise KeyError(item)
开发者ID:djedproject,项目名称:ptahcms,代码行数:26,代码来源:container.py


示例5: update

 def update(self):
     result = {}
     text_analyzer = get_current_registry().getUtility(
                                               ITextAnalyzer,
                                               'text_analyzer')
     amendment_viewer = get_current_registry().getUtility(
                                                IAmendmentViewer,
                                                'amendment_viewer')
     souptextdiff, explanations = amendment_viewer.get_explanation_diff(
                                                 self.context, self.request)
     amendment_viewer.add_details(explanations,
                                  self.context,
                                  self.request,
                                  souptextdiff,
                                  self.readonly_explanation_template)
     text_diff = text_analyzer.soup_to_text(souptextdiff)
     not_published_ideas = [i for i in self.context.get_used_ideas() \
                            if not('published' in i.state)]
     values = {'context': self.context,
               'explanationtext': text_diff,
               'ideas': not_published_ideas}
     body = self.content(result=values, template=self.template)['body']
     item = self.adapt_item(body, self.viewid)
     result['coordinates'] = {self.coordinates:[item]}
     return result
开发者ID:jean,项目名称:nova-ideo,代码行数:25,代码来源:submit.py


示例6: __acl__

    def __acl__(self):

        acl = self.admins() + self.viewers() + self.editors() + [DENY_ALL]

        get_current_registry().notify(ACLRequest(acl, self.context))

        return acl
开发者ID:wyldebeast-wunderliebe,项目名称:w20e.pycms,代码行数:7,代码来源:security.py


示例7: notify_role_for_acceptance

def notify_role_for_acceptance(user_id, requester, model):
    """Notify the given ``user_id`` on ``model`` that s/he has been
    assigned a role on said model and can now accept the role.
    """
    accounts = get_current_registry().getUtility(IOpenstaxAccounts)
    settings = get_current_registry().settings
    base_url = settings['webview.url']
    link = urlparse.urljoin(base_url, '/users/role-acceptance/{}'
                            .format(model.id))

    subject = 'Requesting action on OpenStax CNX content'
    body = '''\
Hello {name},

{requester} added you to content titled {title}.
Please go to the following link to accept your roles and license:
{link}

Thank you from your friends at OpenStax CNX
'''.format(name=user_id,
           requester=requester,
           title=model.metadata['title'],
           link=link)
    try:
        accounts.send_message(user_id, subject, body)
    except urllib2.HTTPError:
        # Can't send messages via accounts for some reason - should be async!
        logger.warning("Failed sending notification message to {}"
                       .format(user_id))
        pass
开发者ID:Connexions,项目名称:cnx-authoring,代码行数:30,代码来源:utils.py


示例8: test_settings

def test_settings(webtest, root):

    from kotti_calendar.resources import Calendar
    from kotti_calendar.resources import Event

    root['calendar'] = calendar = Calendar(title=u'Calendar')

    for c in range(6):
        calendar['event-%d' % c] = Event(
            title=u'Event %d' % c,
            start=datetime(2112, 8, 23, c, 0, 0))

    transaction.commit()

    resp = webtest.get('/')
    assert u"Event 5" not in resp.body

    settings = get_current_registry().settings
    settings['kotti_calendar.upcoming_events_widget.events_count'] = u'nan'
    resp = webtest.get('/')
    assert u"Event 5" not in resp.body

    settings = get_current_registry().settings
    settings['kotti_calendar.upcoming_events_widget.events_count'] = u'7'
    resp = webtest.get('/')
    assert u"Event 5" in resp.body
开发者ID:Kotti,项目名称:kotti_calendar,代码行数:26,代码来源:test_widgets.py


示例9: get_text_amendment_diff_submitted

def get_text_amendment_diff_submitted(amendment, request):
    text_analyzer = get_current_registry().getUtility(
                             ITextAnalyzer, 'text_analyzer')
    amendment_viewer = get_current_registry().getUtility(
                             IAmendmentViewer, 'amendment_viewer')
    souptextdiff, explanations = amendment_viewer.get_explanation_diff(
                                                    amendment, request)
    amendment_viewer.add_details(explanations, amendment, request, souptextdiff)
    return explanations, text_analyzer.soup_to_text(souptextdiff)
开发者ID:jean,项目名称:nova-ideo,代码行数:9,代码来源:behaviors.py


示例10: get_settings

def get_settings():
    from kotti.resources import Settings
    session = DBSession()
    db_settings = session.query(Settings).order_by(desc(Settings.id)).first()
    if db_settings is not None:
        reg_settings = dict(get_current_registry().settings)
        reg_settings.update(db_settings.data)
        return reg_settings
    else:
        return get_current_registry().settings
开发者ID:mr-krille,项目名称:Kotti,代码行数:10,代码来源:__init__.py


示例11: update

    def update(self, **data):
        if self.__type__:
            tinfo = self.__type__

            for field in tinfo.fieldset.fields():
                val = data.get(field.name, field.default)
                if val is not form.null:
                    setattr(self, field.name, val)

            get_current_registry().notify(
                ptah.events.ContentModifiedEvent(self))
开发者ID:djedproject,项目名称:ptahcms,代码行数:11,代码来源:content.py


示例12: uncheckedDatas_graph

def uncheckedDatas_graph(request):
    session = request.dbsession

    viewArgos = Base.metadata.tables["VArgosData_With_EquipIndiv"]
    queryArgos = (
        select([viewArgos.c["type"].label("type"), func.count("*").label("nb")])
        .where(viewArgos.c["checked"] == 0)
        .group_by(viewArgos.c["type"])
    )

    viewGSM = Base.metadata.tables["VGSMData_With_EquipIndiv"]
    queryGSM = select([func.count("*").label("nb")]).where(viewGSM.c["checked"] == 0)

    queryRFID = select([func.count("*").label("nb")]).where(Rfid.checked == 0)
    data = []

    session1 = threadlocal.get_current_registry().dbmaker()
    session2 = threadlocal.get_current_registry().dbmaker()
    session3 = threadlocal.get_current_registry().dbmaker()

    global graphDataDate
    global pendingSensorData

    d = datetime.datetime.now() - datetime.timedelta(days=1)

    if graphDataDate["pendingSensorData"] is None or graphDataDate["pendingSensorData"] < d:
        graphDataDate["pendingSensorData"] = datetime.datetime.now()

        argosData = session1.execute(queryArgos).fetchall()
        for row in argosData:
            curRow = OrderedDict(row)
            lab = curRow["type"].upper()
            if lab == "ARG":
                lab = "ARGOS"
            data.append({"value": curRow["nb"], "label": lab})

        for row in session2.execute(queryGSM).fetchall():
            curRow = OrderedDict(row)
            data.append({"value": curRow["nb"], "label": "GSM"})

        for row in session3.execute(queryRFID).fetchall():
            curRow = OrderedDict(row)
            data.append({"value": curRow["nb"], "label": "RFID"})
        data.sort(key=itemgetter("label"))
        pendingSensorData = data
    else:
        print("unchecked data already fetched")

    session1.close()
    session2.close()
    session3.close()

    return pendingSensorData
开发者ID:gerald13,项目名称:ecoReleve-Data,代码行数:53,代码来源:statistics.py


示例13: after_request

def after_request(response):
    if flask.request.method == 'OPTIONS':
        return response

    if 200 <= response.status_code < 300:
        match = re.match(r'^store\.(\w+)_annotation$', flask.request.endpoint)
        if match:
            action = match.group(1)
            if action != 'delete':
                annotation = json.loads(response.data)
                event = events.AnnotatorStoreEvent(annotation, action)
                get_current_registry().notify(event)
    return response
开发者ID:abigailricarte,项目名称:h,代码行数:13,代码来源:store.py


示例14: uncheckedDatas_graph

def uncheckedDatas_graph(request):
    session = request.dbsession

    viewArgos = Base.metadata.tables['VArgosData_With_EquipIndiv']
    queryArgos= select([viewArgos.c['type'].label('type'),func.count('*').label('nb')]
        ).where(viewArgos.c['checked'] == 0
        ).group_by(viewArgos.c['type'])

    viewGSM = Base.metadata.tables['VGSMData_With_EquipIndiv']
    queryGSM= select([func.count('*').label('nb')]
        ).where(viewGSM.c['checked'] == 0)

    queryRFID = select([func.count('*').label('nb')]
        ).where(Rfid.checked == 0)
    data = []

    session1 = threadlocal.get_current_registry().dbmaker()
    session2 = threadlocal.get_current_registry().dbmaker()
    session3 = threadlocal.get_current_registry().dbmaker()

    global graphDataDate
    global pendingSensorData

    d = datetime.datetime.now() - datetime.timedelta(days=1)

    if graphDataDate['pendingSensorData'] is None or graphDataDate['pendingSensorData'] < d :
        graphDataDate['pendingSensorData'] = datetime.datetime.now()
        
        argosData = session1.execute(queryArgos).fetchall()
        for row in argosData:
            curRow = OrderedDict(row)
            lab = curRow['type'].upper()
            if lab == 'ARG':
                lab = 'ARGOS'
            data.append({'value':curRow['nb'],'label':lab})

        for row in session2.execute(queryGSM).fetchall() :
            curRow = OrderedDict(row)
            data.append({'value':curRow['nb'],'label':'GSM'})

        for row in session3.execute(queryRFID).fetchall() :
            curRow = OrderedDict(row)
            data.append({'value':curRow['nb'],'label':'RFID'})
        data.sort(key = itemgetter('label'))
        pendingSensorData = data

    session1.close()
    session2.close()
    session3.close()

    return pendingSensorData
开发者ID:romfabbro,项目名称:ecoReleve-Data,代码行数:51,代码来源:statistics.py


示例15: __setitem__

    def __setitem__(self, key, item):
        """Set a new item in the container."""

        if not isinstance(item, BaseContent):
            raise ValueError("Content object is required")

        if item.__uri__ == self.__uri__:
            raise ValueError("Can't set to it self")

        parents = [p.__uri__ for p in load_parents(self)]
        if item.__uri__ in parents:
            raise TypeError("Can't itself to chidlren")

        if key in self.keys():
            raise KeyError(key)

        if item.__parent_uri__ is None:
            event = ptah.events.ContentAddedEvent(item)
        else:
            event = ptah.events.ContentMovedEvent(item)

        item.__name__ = key
        item.__parent__ = self
        item.__parent_uri__ = self.__uri__
        item.__path__ = '%s%s/'%(self.__path__, key)

        if item not in Session:
            Session.add(item)

        # temporary keys
        if not self._v_items:
            self._v_items = {key: item}
        else:
            self._v_items[key] = item

        if key not in self._v_keys:
            self._v_keys.append(key)

        # recursevly update children paths
        def update_path(container):
            path = container.__path__
            for item in container.values():
                item.__path__ = '%s%s/'%(path, item.__name__)

                if isinstance(item, BaseContainer):
                    update_path(item)

        if isinstance(item, BaseContainer):
            update_path(item)

        get_current_registry().notify(event)
开发者ID:runyaga,项目名称:ptah,代码行数:51,代码来源:container.py


示例16: unit

def unit(request):
    response = BaseFixture()
    response.setUp()
    response.request = testing.DummyRequest()
    response.config = testing.setUp(request=response.request)
    get_current_registry().settings[CONFIG_RESOURCES] = models
    get_current_registry().settings['sqlalchemy.url'] =\
        TEST_DATABASE_CONNECTION_STRING

    def fin():
        response.tearDown()
        testing.tearDown()

    request.addfinalizer(fin)
    return response
开发者ID:drnextgis,项目名称:ps_alchemy,代码行数:15,代码来源:conftest.py


示例17: versioned

def versioned(path):
    version = get_current_registry().settings['app_version']
    entry_path = get_current_registry().settings['entry_path'] + '/'
    if version is not None:
        agnosticPath = make_agnostic(path)
        parsedURL = urlparse(agnosticPath)
        # we don't do version when behind pserve (at localhost)
        if 'localhost:' not in parsedURL.netloc:
            parts = parsedURL.path.split(entry_path, 1)
            if len(parts) > 1:
                parsedURL = parsedURL._replace(path=parts[0] + entry_path + version + '/' + parts[1])
                agnosticPath = urlunparse(parsedURL)
        return agnosticPath
    else:
        return path
开发者ID:justb4,项目名称:mf-chsdi3,代码行数:15,代码来源:helpers.py


示例18: service_entry

 def service_entry(self):
     """Implement this as a property to have the context when looking for
     the value of the setting"""
     if self._service_entry is None:
         settings = get_current_registry().settings
         self._service_entry = settings.get('tokenserver.service_entry')
     return self._service_entry
开发者ID:edmoz,项目名称:tokenserver,代码行数:7,代码来源:memorynode.py


示例19: get_text_amendment_diff

def get_text_amendment_diff(proposal, amendment):
    text_analyzer = get_current_registry().getUtility(
                                 ITextAnalyzer,'text_analyzer')
    soup, textdiff =  text_analyzer.render_html_diff(
                            getattr(proposal, 'text', ''), 
                            getattr(amendment, 'text', ''))
    return textdiff
开发者ID:jean,项目名称:nova-ideo,代码行数:7,代码来源:behaviors.py


示例20: initialize_sql

def initialize_sql(engine, drop_all=False):
    DBSession.registry.clear()
    DBSession.configure(bind=engine)
    metadata.bind = engine

    if drop_all or os.environ.get('KOTTI_TEST_DB_STRING'):
        metadata.reflect()
        metadata.drop_all(engine)

    # Allow users of Kotti to cherry pick the tables that they want to use:
    settings = get_current_registry().settings
    tables = settings['kotti.use_tables'].strip() or None
    if tables:
        if 'settings' not in tables:
            tables += ' settings'
        tables = [metadata.tables[name] for name in tables.split()]

    if engine.dialect.name == 'mysql':  # pragma: no cover
        from sqlalchemy.dialects.mysql.base import LONGBLOB
        File.__table__.c.data.type = LONGBLOB()

    metadata.create_all(engine, tables=tables)
    for populate in get_settings()['kotti.populators']:
        populate()
    commit()

    return DBSession()
开发者ID:fschulze,项目名称:Kotti,代码行数:27,代码来源:resources.py



注:本文中的pyramid.threadlocal.get_current_registry函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python threadlocal.get_current_request函数代码示例发布时间:2022-05-27
下一篇:
Python testing.DummyRequest类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap