本文整理汇总了Python中pyramid.threadlocal.get_current_request函数的典型用法代码示例。如果您正苦于以下问题:Python get_current_request函数的具体用法?Python get_current_request怎么用?Python get_current_request使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_current_request函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_verify_login_record_is_updated
def test_verify_login_record_is_updated():
# Use a fake session for this test.
user_id = db_utils.create_user()
sess = get_current_request().weasyl_session = create_session(user_id)
db = d.connect()
db.add(sess)
db.flush()
d.engine.execute("UPDATE login SET last_login = -1 WHERE userid = %(id)s", id=user_id)
login.signin(get_current_request(), user_id)
last_login = d.engine.scalar("SELECT last_login FROM login WHERE userid = %(id)s", id=user_id)
assert last_login > -1
开发者ID:Weasyl,项目名称:weasyl,代码行数:11,代码来源:test_signin.py
示例2: linkFrom
def linkFrom(validator, linkFrom, instance, schema):
# avoid circular import
from contentbase import Item, TYPES
linkType, linkProp = linkFrom.split('.')
if validator.is_type(instance, "string"):
request = get_current_request()
base = request.root.by_item_type[linkType]
try:
item = find_resource(base, instance.replace(':', '%3A'))
if item is None:
raise KeyError()
except KeyError:
error = "%r not found" % instance
yield ValidationError(error)
return
if not isinstance(item, Item):
error = "%r is not a linkable resource" % instance
yield ValidationError(error)
return
if linkType not in set([item.item_type] + item.base_types):
error = "%r is not of type %s" % (instance, repr(linkType))
yield ValidationError(error)
return
pass
else:
path = instance.get('@id')
request = get_current_request()
if validator._serialize:
lv = len(validator._validated)
if '@id' in instance:
del instance['@id']
# treat the link property as not required
# because it will be filled in when the child is created/updated
subschema = request.registry[TYPES][linkType].schema
subschema = copy.deepcopy(subschema)
if linkProp in subschema['required']:
subschema['required'].remove(linkProp)
for error in validator.descend(instance, subschema):
yield error
if validator._serialize:
validated_instance = validator._validated[lv]
del validator._validated[lv:]
if path is not None:
item = find_resource(request.root, path.replace(':', '%3A'))
validated_instance['uuid'] = str(item.uuid)
elif 'uuid' in validated_instance: # where does this come from?
del validated_instance['uuid']
validator._validated[-1] = validated_instance
开发者ID:ClinGen,项目名称:clincoded,代码行数:52,代码来源:schema_utils.py
示例3: task_prerun_signal
def task_prerun_signal(task_id, task, args, kwargs, **kwaargs):
if hasattr(celery, "pyramid"):
env = celery.pyramid
env = prepare(registry=env["request"].registry)
proper_base_url = env["request"].registry.settings["base_url"]
tmp_request = Request.blank("/", base_url=proper_base_url)
# ensure tasks generate url for right domain from config
env["request"].environ["HTTP_HOST"] = tmp_request.environ["HTTP_HOST"]
env["request"].environ["SERVER_PORT"] = tmp_request.environ["SERVER_PORT"]
env["request"].environ["SERVER_NAME"] = tmp_request.environ["SERVER_NAME"]
env["request"].environ["wsgi.url_scheme"] = tmp_request.environ[
"wsgi.url_scheme"
]
get_current_request().tm.begin()
开发者ID:ergo,项目名称:testscaffold,代码行数:14,代码来源:__init__.py
示例4: push
def push(self, message):
collection = get_current_request().db[self.__collection__]
now = datetime.datetime.utcnow()
message['created'] = now
request = get_current_request()
for _id in self.shared_by:
if request.user._id != _id:
other_id = _id
collection.update({'_id': self._id, 'user_data._id': other_id},
{'$push': {'messages': message},
'$set': {'date_modified': now },
'$inc': {'user_data.$.new' : 1},
})
开发者ID:xaevir,项目名称:RubyRate-python,代码行数:14,代码来源:models.py
示例5: test_start_scheduler
def test_start_scheduler(mock):
from papaye.config.startup import start_scheduler
from papaye.config.utils import SettingsReader
config = Configurator()
config.registry.settings = {
'papaye.cache': 'true',
'papaye.scheduler': 'papaye.tests.test_config:SchedulerTest',
}
config.add_directive('settings_reader', lambda c: SettingsReader(c))
start_scheduler(config)
get_current_request()
assert isinstance(mock.call_args[0][0], SchedulerTest)
开发者ID:rcommande,项目名称:papaye,代码行数:15,代码来源:test_config.py
示例6: test_start_scheduler_without_cache_in_configuration
def test_start_scheduler_without_cache_in_configuration(mock):
from papaye.config.startup import start_scheduler
from papaye.tasks.devices import DummyScheduler
from papaye.config.utils import SettingsReader
config = Configurator()
config.registry.settings = {
'papaye.cache': 'true',
}
config.add_directive('settings_reader', lambda c: SettingsReader(c))
start_scheduler(config)
get_current_request()
assert isinstance(mock.call_args[0][0], DummyScheduler)
开发者ID:rcommande,项目名称:papaye,代码行数:15,代码来源:test_config.py
示例7: timestruct
def timestruct(jinja_ctx, context, **kw):
"""Render both humanized time and accurate time.
* show_timezone
* target_timezone
* source_timezone
* format
"""
if not context:
return ""
assert type(context) in (datetime.datetime, datetime.time,)
request = jinja_ctx.get('request') or get_current_request()
if not jinja_ctx:
return ""
kw = kw.copy()
kw["time"] = context
kw["format"] = kw.get("format") or "YYYY-MM-DD HH:mm"
return Markup(render("core/timestruct.html", kw, request=request))
开发者ID:agronholm,项目名称:websauna,代码行数:26,代码来源:templatecontext.py
示例8: resolve_next_date
def resolve_next_date(self, args, info):
"""Return first date in the start_end interval.
"""
request = get_current_request()
start, end = getattr(request, 'start_end', (None, None))
dates = occurences_start(self._root, 'dates', from_=start, until=end)
return dates[0].date() if dates else None
开发者ID:ecreall,项目名称:lagendacommun,代码行数:7,代码来源:schema.py
示例9: add_renderer_globals
def add_renderer_globals(event):
""" add globals to templates
csrf_token - bare token
csrf_token_field - hidden input field with token inserted
flash - flash messages
"""
request = event.get('request')
settings = get_current_registry().settings
template = settings['apex.apex_render_template']
if request is None:
request = get_current_request()
csrf_token = request.session.get_csrf_token()
globs = {
'csrf_token': csrf_token,
'csrf_token_field': '<input type="hidden" name="csrf_token" value="%s" />' % csrf_token,
'flash': flash,
}
if template.endswith('.pt'):
globs['flash_t'] = get_renderer('apex:templates/flash_template.pt').implementation()
event.update(globs)
开发者ID:nagyv,项目名称:pyramid_apex,代码行数:27,代码来源:subscribers.py
示例10: get_url
def get_url(self, namespace, key, seconds=3600, https=False):
"""Returns a Pyramid static URL.
If you use another web framework, please override this method.
"""
from pyramid.threadlocal import get_current_request
return get_current_request().static_url("/".join((self.storage_path, str(namespace), key)))
开发者ID:pitymaia,项目名称:keepluggable,代码行数:7,代码来源:local.py
示例11: beforerender_subscriber
def beforerender_subscriber(event):
request = event['request']
if request is None:
request = get_current_request()
if getattr(request, 'debug_toolbar', None) is not None:
for panel in request.debug_toolbar.panels:
panel.process_beforerender(event)
开发者ID:clintron,项目名称:pyramid_debugtoolbar,代码行数:7,代码来源:toolbar.py
示例12: after_request
def after_request(response):
if flask.request.method == 'OPTIONS':
in_value = response.headers.get('Access-Control-Allow-Headers', '')
allowed = [h.strip() for h in in_value.split(',')]
allowed.append('X-Client-ID')
out_value = ', '.join(allowed)
response.headers['Access-Control-Allow-Headers'] = out_value
return response
if 200 <= response.status_code < 300:
match = re.match(r'^store\.(\w+)_annotation$', flask.request.endpoint)
if match:
request = get_current_request()
action = match.group(1)
if action == 'delete':
data = json.loads(flask.request.data)
else:
data = json.loads(response.data)
annotation = wrap_annotation(data)
event = events.AnnotationEvent(request, annotation, action)
request.registry.notify(event)
return response
开发者ID:mrienstra,项目名称:h,代码行数:25,代码来源:store.py
示例13: indirect_idea_content_links
def indirect_idea_content_links(self):
from pyramid.threadlocal import get_current_request
request = get_current_request()
if request:
return self.indirect_idea_content_links_with_cache()
else:
return self.indirect_idea_content_links_without_cache()
开发者ID:assembl,项目名称:assembl,代码行数:7,代码来源:post.py
示例14: before_request
def before_request():
request = get_current_request()
annotation_ctor = request.registry.getUtility(interfaces.IAnnotationClass)
flask.g.annotation_class = annotation_ctor
flask.g.auth = Authenticator(request)
flask.g.authorize = functools.partial(authorize, request)
flask.g.before_annotation_update = anonymize_deletes
开发者ID:nlholdem,项目名称:h,代码行数:7,代码来源:api.py
示例15: user_tagged_content
def user_tagged_content(event):
if ITagAddedEvent.providedBy(event):
request = get_current_request()
context = getattr(request, "context", None)
if context is None:
return
events = find_events(context)
if not events:
return
site = find_site(context)
catalog = find_catalog(context)
path = catalog.document_map.address_for_docid(event.item)
tagged = find_resource(site, path)
if tagged is None:
return
profile_id = event.user
if profile_id in (None, "None"):
return
profile = site["profiles"][profile_id]
info = _getInfo(profile, tagged)
if info is None:
return
if info["content_type"] == "Community":
info["flavor"] = "tagged_community"
elif info["content_type"] == "Person":
info["flavor"] = "tagged_profile"
else:
info["flavor"] = "tagged_other"
info["operation"] = "tagged"
info["tagname"] = event.name
events.push(**info)
开发者ID:hathawsh,项目名称:karl,代码行数:31,代码来源:contentfeeds.py
示例16: add_get_user
def add_get_user(event):
request = event.get('request') or threadlocal.get_current_request()
try:
event['user'] = request.user
except:
return
开发者ID:arthens,项目名称:triage,代码行数:7,代码来源:subscribers.py
示例17: resolve_output_to_url
def resolve_output_to_url(self, item):
request = get_current_request()
# Attempt to resolve the output item. First, resolve the item if its
# an asset spec. If it's a relative path, construct a url from the
# environment base url, which may be an asset spec that the base class
# cannot handle. Try to resolve this url and the item against the
# `static_url` method.
url = None
package, filepath = self._split_asset_spec(item)
if package is None:
if not path.isabs(filepath):
item = path.join(self.env.directory, filepath)
url = path.join(self.env.url, filepath)
else:
item = path.join(package, filepath)
if url is None:
url = super(PyramidResolver, self).resolve_output_to_url(item)
for attempt in (url, item):
try:
return request.static_url(attempt)
except:
pass
return url
开发者ID:aodag,项目名称:pyramid_webassets,代码行数:27,代码来源:__init__.py
示例18: get_profile
def get_profile(self, request=None):
"""
Returns AuthUser.profile object, creates record if it doesn't exist.
.. code-block:: python
from apex.models import AuthUser
user = AuthUser.get_by_id(1)
profile = user.get_profile(request)
in **development.ini**
.. code-block:: python
apex.auth_profile =
"""
if not request:
request = get_current_request()
auth_profile = request.registry.settings.get('apex.auth_profile')
if auth_profile:
resolver = DottedNameResolver(auth_profile.split('.')[0])
profile_cls = resolver.resolve(auth_profile)
return get_or_create(DBSession, profile_cls, auth_id=self.id)
开发者ID:akirisolutions-zz,项目名称:apex,代码行数:25,代码来源:models.py
示例19: check_password
def check_password(cls, **kwargs):
if kwargs.has_key('id'):
user = cls.get_by_id(kwargs['id'])
if kwargs.has_key('login'):
user = cls.get_by_login(kwargs['login'])
request = get_current_request()
use_sha256 = asbool(request.registry.settings.get('apex.use_sha256'))
if not user:
return False
try:
if use_sha256:
pw = kwargs['password']
if CRYPTPasswordManager(SHA256CRYPT).check(user.password, pw):
return True
else:
pw = kwargs['password'] + user.salt
if BCRYPTPasswordManager().check(user.password, pw):
return True
except TypeError:
pass
fallback_auth = request.registry.settings.get('apex.fallback_auth')
if fallback_auth:
resolver = DottedNameResolver(fallback_auth.split('.', 1)[0])
fallback = resolver.resolve(fallback_auth)
pw = kwargs['password']
return fallback().check(DBSession, request, user, pw)
return False
开发者ID:akirisolutions-zz,项目名称:apex,代码行数:32,代码来源:models.py
示例20: assetmutator_url
def assetmutator_url(path, **kw):
"""
Returns a Pyramid :meth:`~pyramid.request.Request.static_url` of the mutated
asset (and mutates the asset if needed).
:param path: The Pyramid asset path to process.
:type path: string - Required
:type mutator: dict or string - Optional
:param mutator: Allows you to override/specify a specific mutator to use
(e.g. ``coffee``), or assign a brand new mutator
dictionary to be used (e.g. ``{'cmd': 'lessc', 'ext':
'css'}``)
"""
request = get_current_request()
settings = request.registry.settings
mutant = Mutator(settings, path, **kw)
if not settings['assetmutator.each_request']:
if not mutant.mutated:
# TODO: Error?
pass
return request.static_url(mutant.new_path)
else:
if mutant.mutated:
return request.static_url(mutant.new_path)
else:
return request.static_url(mutant.process())
开发者ID:Enucatl,项目名称:pyramid_assetmutator,代码行数:30,代码来源:__init__.py
注:本文中的pyramid.threadlocal.get_current_request函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论