本文整理汇总了Python中pyramid.traversal.find_resource函数的典型用法代码示例。如果您正苦于以下问题:Python find_resource函数的具体用法?Python find_resource怎么用?Python find_resource使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了find_resource函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __call__
def __call__(self, node, value):
root = find_root(self.context)
try:
find_resource(root, value)
except KeyError:
raise colander.Invalid(
node, _("Not an existing valid path."))
开发者ID:ArcheProject,项目名称:Arche,代码行数:7,代码来源:validators.py
示例2: _find_resources
def _find_resources(root,
entry: dict) -> (IUser, IBadge, sheets.badge.IBadgeable):
userpath = entry.get('user')
user = find_resource(root, userpath)
badgepath = entry.get('badge')
badge = find_resource(root, badgepath)
badgeablepath = entry.get('badgeable')
badgeable = find_resource(root, badgeablepath)
return user, badge, badgeable
开发者ID:Janaba,项目名称:adhocracy3,代码行数:9,代码来源:ad_assign_badges.py
示例3: linkFrom
def linkFrom(validator, linkFrom, instance, schema):
# avoid circular import
from snovault import Item, TYPES, COLLECTIONS
request = get_current_request()
collections = request.registry[COLLECTIONS]
linkType, linkProp = linkFrom.split('.')
if validator.is_type(instance, "string"):
base = collections[linkType]
try:
item = find_resource(base, instance.replace(':', '%3A'))
if item is None:
raise KeyError()
except KeyError:
error = "%r not found" % instance
yield ValidationError(error)
return
if not isinstance(item, Item):
error = "%r is not a linkable resource" % instance
yield ValidationError(error)
return
if linkType not in set([item.type_info.name] + item.type_info.base_types):
error = "%r is not of type %s" % (instance, repr(linkType))
yield ValidationError(error)
return
pass
else:
path = instance.get('@id')
if validator._serialize:
lv = len(validator._validated)
if '@id' in instance:
del instance['@id']
# treat the link property as not required
# because it will be filled in when the child is created/updated
subschema = request.registry[TYPES][linkType].schema
subschema = copy.deepcopy(subschema)
if linkProp in subschema['required']:
subschema['required'].remove(linkProp)
for error in validator.descend(instance, subschema):
yield error
if validator._serialize:
validated_instance = validator._validated[lv]
del validator._validated[lv:]
if path is not None:
item = find_resource(request.root, path.replace(':', '%3A'))
validated_instance['uuid'] = str(item.uuid)
elif 'uuid' in validated_instance: # where does this come from?
del validated_instance['uuid']
validator._validated[-1] = validated_instance
开发者ID:j1z0,项目名称:snovault,代码行数:52,代码来源:schema_utils.py
示例4: _deserialize_location_or_url
def _deserialize_location_or_url(self, node, value):
if value.startswith('/'):
context = node.bindings['context']
return find_resource(context, value)
else:
context = node.bindings['context']
request = node.bindings['request']
application_url_len = len(request.application_url)
if application_url_len > len(str(value)):
raise KeyError
# Fixme: This does not work with :term:`virtual hosting`
path = value[application_url_len:]
return find_resource(context, path)
开发者ID:Janaba,项目名称:adhocracy3,代码行数:13,代码来源:__init__.py
示例5: isReport
def isReport(self, name):
""" See IMailinDispatcher.
"""
root = find_root(self.context)
if name in root.list_aliases:
return True
pd = find_peopledirectory(self.context)
tokens = name.split('+')
try:
find_resource(pd, tokens)
except KeyError:
return False
return True
开发者ID:Falmarri,项目名称:karl,代码行数:13,代码来源:mailin.py
示例6: _deserialize_location_or_url
def _deserialize_location_or_url(self, node, value):
if value.startswith('/'):
context = node.bindings['context']
return find_resource(context, value)
else:
context = node.bindings['context']
request = node.bindings['request']
root_url = request.resource_url(request.root,
route_name=API_ROUTE_NAME)
root_url_len = len(root_url)
if root_url_len > len(str(value)):
raise KeyError
path = value[root_url_len:]
return find_resource(context, path)
开发者ID:liqd,项目名称:adhocracy3,代码行数:14,代码来源:__init__.py
示例7: should_show_calendar_tab
def should_show_calendar_tab(self):
"""whether to show the calendar tab in the header menu
user must be staff, and the calendar object must exist"""
if self._should_show_calendar_tab is None:
if not self.user_is_staff:
self._should_show_calendar_tab = False
else:
calendar_path = "/offices/calendar"
try:
find_resource(self.site, calendar_path)
self._should_show_calendar_tab = True
except KeyError:
self._should_show_calendar_tab = False
return self._should_show_calendar_tab
开发者ID:lslaz1,项目名称:karl,代码行数:15,代码来源:api.py
示例8: batch_upgrade
def batch_upgrade(request):
request.datastore = 'database'
transaction.get().setExtendedInfo('upgrade', True)
batch = request.json['batch']
root = request.root
storage = request.registry[STORAGE].write
session = storage.DBSession()
results = []
for uuid in batch:
item_type = None
update = False
error = False
sp = session.begin_nested()
try:
item = find_resource(root, uuid)
item_type = item.type_info.item_type
update, errors = update_item(storage, item)
except Exception:
logger.exception('Error updating: /%s/%s', item_type, uuid)
sp.rollback()
error = True
else:
if errors:
errortext = [
'%s: %s' % ('/'.join(error.path) or '<root>', error.message)
for error in errors]
logger.error(
'Validation failure: /%s/%s\n%s', item_type, uuid, '\n'.join(errortext))
sp.rollback()
error = True
else:
sp.commit()
results.append((item_type, uuid, update, error))
return {'results': results}
开发者ID:Kapeel,项目名称:encoded,代码行数:34,代码来源:batchupgrade.py
示例9: get_font
def get_font(self):
try:
font = traversal.find_resource(traversal.find_root(self.get_model()), self.font)
except:
return None
else:
return font
开发者ID:makeartweb,项目名称:makeartweb2,代码行数:7,代码来源:models.py
示例10: user_tagged_content
def user_tagged_content(event):
if ITagAddedEvent.providedBy(event):
request = get_current_request()
context = getattr(request, 'context', None)
if context is None:
return
events = find_events(context)
if not events:
return
site = find_site(context)
catalog = find_catalog(context)
path = catalog.document_map.address_for_docid(event.item)
tagged = find_resource(site, path)
if tagged is None:
return
profile_id = event.user
if profile_id in (None, 'None'):
return
profile = site['profiles'][profile_id]
info = _getInfo(profile, tagged)
if info is None:
return
if info['content_type'] == 'Community':
info['flavor'] = 'tagged_community'
elif info['content_type'] == 'Person':
info['flavor'] = 'tagged_profile'
else:
info['flavor'] = 'tagged_other'
info['operation'] = 'tagged'
info['tagname'] = event.name
events.push(**info)
开发者ID:araymund,项目名称:karl,代码行数:31,代码来源:contentfeeds.py
示例11: main
def main():
parser = argparse.ArgumentParser()
parser.add_argument("config_uri", help="Paster ini file to load settings from")
parser.add_argument("path", help="Which path to extract info from")
args = parser.parse_args()
env = bootstrap(args.config_uri)
root = env['root']
request = env['request']
print "Reading poll times from path %r" % args.path
#Just to make sure path exists
find_resource(root, args.path)
query = "path == '%s'" % args.path
query += " and type_name == 'Poll'"
docids = root.catalog.query(query)[1]
for poll in request.resolve_docids(docids, perm=None):
print_voting_timestamps(poll, request)
开发者ID:VoteIT,项目名称:voteit_devel_buildout,代码行数:16,代码来源:voting_timestamps.py
示例12: _fetch_organisation_by_name
def _fetch_organisation_by_name(root, organisation_path):
organisation = find_resource(root, organisation_path)
if organisation is None:
print('could not find organisation %s' % (organisation))
sys.exit()
else:
return organisation
开发者ID:libscott,项目名称:adhocracy3,代码行数:7,代码来源:create_process_for_region.py
示例13: main
def main(argv=sys.argv):
def usage(msg):
print msg
sys.exit(2)
description = "Postprocess new recordings as they are made."
usage = "usage: %prog config_uri"
parser = optparse.OptionParser(usage, description=description)
opts, args = parser.parse_args(argv[1:])
try:
config_uri = args[0]
except KeyError:
usage("Requires a config_uri as an argument")
setup_logging(config_uri)
env = bootstrap(config_uri)
root = env["root"]
redis = get_redis(env["request"])
while True:
path = redis.blpop("yss.new-recordings", 0)[1]
time.sleep(1)
transaction.abort()
recording = find_resource(root, path)
try:
postprocess(recording)
except:
redis.rpush("yss.new-recorings", path)
raise
开发者ID:notaliens,项目名称:youshouldsing,代码行数:28,代码来源:postproc.py
示例14: url_to_resource
def url_to_resource(self, url):
"""
Returns the resource that is addressed by the given URL.
:param str url: URL to convert
:return: member or collection resource
:note: If the query string in the URL has multiple values for a
query parameter, the last definition in the query string wins.
"""
parsed = urlparse.urlparse(url)
parsed_path = parsed.path # namedtupble problem pylint: disable=E1101
rc = find_resource(self.__request.root, traversal_path(parsed_path))
if ICollectionResource in provided_by(rc):
# In case we found a collection, we have to filter, order, slice.
parsed_query = parsed.query # namedtuple problem pylint: disable=E1101
params = dict(parse_qsl(parsed_query))
filter_string = params.get('q')
if not filter_string is None:
rc.filter = \
UrlPartsConverter.make_filter_specification(filter_string)
order_string = params.get('sort')
if not order_string is None:
rc.order = \
UrlPartsConverter.make_order_specification(order_string)
start_string = params.get('start')
size_string = params.get('size')
if not (start_string is None or size_string is None):
rc.slice = \
UrlPartsConverter.make_slice_key(start_string, size_string)
elif not IMemberResource in provided_by(rc):
raise ValueError('Traversal found non-resource object "%s".' % rc)
return rc
开发者ID:b8va,项目名称:everest,代码行数:33,代码来源:url.py
示例15: _set_workflow_state
def _set_workflow_state(
root: IResource, registry: Registry, resource_path: str, states: [str], absolute=False, reset=False
):
resource = find_resource(root, resource_path)
states_to_transition = _get_states_to_transition(resource, registry, states, absolute, reset)
transition_to_states(resource, states_to_transition, registry, reset=reset)
transaction.commit()
开发者ID:libscott,项目名称:adhocracy3,代码行数:7,代码来源:set_workflow_state.py
示例16: test_username_is_resolved_to_his_path
def test_username_is_resolved_to_his_path(self, registry):
from adhocracy_core.scripts import import_resources
from .import_users import _get_user_locator
(self._tempfd, filename) = mkstemp()
with open(filename, 'w') as f:
f.write(json.dumps([
{"path": "/principals/users/badge_assignments",
"content_type": "adhocracy_core.resources.badge.IBadgeAssignment",
"data": {"adhocracy_core.sheets.badge.IBadgeAssignment":
{"subject": "user_by_login:god",
"badge": "/orga/badges/badge0",
"object": "/principals/users/0000000"
},
"adhocracy_core.sheets.name.IName": {"name": "assign0"}}
}]))
root = registry.content.create(IRootPool.__identifier__)
appstructs = {'adhocracy_core.sheets.name.IName': {'name': 'orga'}}
orga = registry.content.create(IOrganisation.__identifier__, root,
appstructs=appstructs, registry=registry)
add_badges_service(orga, registry, {})
badge_appstructs = {'adhocracy_core.sheets.name.IName': {'name': 'badge0'}}
registry.content.create(IBadge.__identifier__,
orga['badges'],
appstructs=badge_appstructs,
registry=registry)
import_resources(root, registry, filename)
assignments = find_resource(root, '/principals/users/badge_assignments/')
assignment = list(assignments.values())[0]
subject = get_sheet_field(assignment, IBadgeAssignment, 'subject')
user_locator = _get_user_locator(root, registry)
god = user_locator.get_user_by_login('god')
assert subject == god
开发者ID:libscott,项目名称:adhocracy3,代码行数:35,代码来源:test_init.py
示例17: checkPermission
def checkPermission(self, info):
""" Does user have permission to author content in the given context?
Uses ACL security policy to test.
"""
users = find_users(self.context)
for target in info['targets']:
if 'error' in target:
continue
report_name = target.get('report')
if report_name is not None:
pd = find_peopledirectory(self.context)
context = find_resource(pd, report_name.split('+'))
permission = "email"
else:
communities = find_communities(self.context)
community = communities[target['community']]
context = community[target['tool']]
permission = "create" # XXX In theory could depend on target
user = users.get_by_id(info['author'])
if user is not None:
user = dict(user)
user['repoze.who.userid'] = info['author']
# BFG Security API always assumes http request, so we fabricate a
# fake request.
request = Request.blank('/')
request.environ['repoze.who.identity'] = user
if not has_permission(permission, context, request):
target['error'] = 'Permission Denied'
开发者ID:Falmarri,项目名称:karl,代码行数:31,代码来源:mailin.py
示例18: forbidden
def forbidden(request):
if not hasattr(request, 'user'):
locale = get_localizer(request)
request.session.flash(locale.translate(_("Login required")))
request.session['came_from'] = request.url
return HTTPFound(request.resource_url(request.root, 'login'))
homes = request.user.find_home()
at_root = not request.path_info.strip('/')
if at_root and len(homes) == 1:
return HTTPFound(request.application_url.rstrip('/') + homes[0])
at_root = at_root and homes
homes = [find_resource(request.root, path) for path in homes]
homes = [{'title': home.title, 'url': request.resource_url(home)}
for home in homes]
request.layout_manager.use_layout('anonymous')
layout = request.layout_manager.layout
if at_root:
layout.page_title = _("Welcome to Gathr")
else:
layout.page_title = _('Forbidden')
return {
'at_root': at_root,
'homes': homes,
}
开发者ID:tethr,项目名称:gathr,代码行数:26,代码来源:forbidden.py
示例19: test_testing_resources
def test_testing_resources(self):
from pyramid.traversal import find_resource
from pyramid.interfaces import ITraverser
ob1 = object()
ob2 = object()
resources = {'/ob1':ob1, '/ob2':ob2}
config = self._makeOne(autocommit=True)
config.testing_resources(resources)
adapter = config.registry.getAdapter(None, ITraverser)
result = adapter(DummyRequest({'PATH_INFO':'/ob1'}))
self.assertEqual(result['context'], ob1)
self.assertEqual(result['view_name'], '')
self.assertEqual(result['subpath'], ())
self.assertEqual(result['traversed'], (text_('ob1'),))
self.assertEqual(result['virtual_root'], ob1)
self.assertEqual(result['virtual_root_path'], ())
result = adapter(DummyRequest({'PATH_INFO':'/ob2'}))
self.assertEqual(result['context'], ob2)
self.assertEqual(result['view_name'], '')
self.assertEqual(result['subpath'], ())
self.assertEqual(result['traversed'], (text_('ob2'),))
self.assertEqual(result['virtual_root'], ob2)
self.assertEqual(result['virtual_root_path'], ())
self.assertRaises(KeyError, adapter, DummyRequest({'PATH_INFO':'/ob3'}))
try:
config.begin()
self.assertEqual(find_resource(None, '/ob1'), ob1)
finally:
config.end()
开发者ID:7924102,项目名称:pyramid,代码行数:29,代码来源:test_testing.py
示例20: recent_items
def recent_items(self):
if self._recent_items is None:
community = find_interface(self.context, ICommunity)
if community is not None:
stmt = """SELECT docid from pgtextindex
WHERE community_docid='%s'
AND content_type not in ('IInvitation',
'ICalendar', 'ICalendarLayer', 'ICalendarCategory',
'IBlog', 'ICommunityRootFolder', 'IWiki')
ORDER BY modification_date DESC
LIMIT 20"""
catalog = find_catalog(self.context)
index = catalog['texts']
docids = index.get_sql_catalog_results(stmt % community.docid)
self._recent_items = []
for docid in docids:
path = catalog.document_map.address_for_docid(docid[0])
try:
model = find_resource(self.context, path)
except KeyError:
continue
if not has_permission('view', model, self.request):
continue
try:
adapted = getMultiAdapter((model, self.request),
IGridEntryInfo)
except ComponentLookupError:
continue
self._recent_items.append(adapted)
return self._recent_items
开发者ID:karlproject,项目名称:karl,代码行数:31,代码来源:api.py
注:本文中的pyramid.traversal.find_resource函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论