本文整理汇总了Python中utils.storage函数的典型用法代码示例。如果您正苦于以下问题:Python storage函数的具体用法?Python storage怎么用?Python storage使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了storage函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: forms
def forms():# body param. maybe from put so not post
ctype = ctx.get('CONTENT_TYPE', '').lower()
def process_fieldstorage(fs):
if isinstance(fs, list):
return [process_fieldstorage(x) for x in fs]
elif fs.filename is None:
return fs.value
else:
return fs
if ctype.startswith('multipart/'):
safe_env = {'QUERY_STRING':''} # Build a safe environment for cgi
for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'):
if key in ctx.environ: safe_env[key] = ctx.environ[key]
fp = StringIO(data())
args = dict(fp=fp, environ=safe_env, keep_blank_values=True)
a = cgi.FieldStorage(**args)
a = a.list or []
return storage([(k, process_fieldstorage(v)) for k, v in a.items()])
elif ctype == 'application/json':
try:
return json.loads(data())
except:
ctx.status = "500 json load error"
raise badrequest()
else:
#pairs = parse_qsl(tonat(self._get_body_string(), 'latin1'))
return storage(parse_qsl(data())) #give up the tonat fun of bottle
开发者ID:ChellsChen,项目名称:hail,代码行数:31,代码来源:webapi.py
示例2: get_tokens
def get_tokens(text):
readline = iter([text]).next
end = None
for t in tokenize.generate_tokens(readline):
t = storage(type=t[0], value=t[1], begin=t[2], end=t[3])
if end is not None and end != t.begin:
_, x1 = end
_, x2 = t.begin
yield storage(type=-1, value=text[x1:x2], begin=end, end=t.begin)
end = t.end
yield t
开发者ID:wangfeng3769,项目名称:remotebox,代码行数:11,代码来源:template.py
示例3: __init__
def __init__(self, name, **kwargs):
self.name = name
# sql parameters
self._sql_col = kwargs.get("sql_col", name)
self._sql_table = kwargs.get("sql_table", name + "_terms")
if self._sql_table:
sql_query = (
"select %s from %s " % (self._sql_col, self._sql_table) + "where id in ($id) order by field(id, $id)"
)
else:
sql_query = None
self._sql_query = kwargs.get("sql_query", sql_query)
# sphinx variables
self._attr = kwargs.get("attr", self._sql_col + "_attr")
self._func = kwargs.get("func", sphinxapi.SPH_GROUPBY_ATTR)
self._group_sort = kwargs.get("group_sort", "@count desc")
self._set_select = kwargs.get("set_select", "@groupby, @count")
self._sph_field = kwargs.get("sph_field", name)
# facets variables
self._order_by = kwargs.get("order_by", lambda v: v["@term"])
self._order_by_desc = False
self._max_num_values = kwargs.get("max_num_values", 15)
self._cutoff = kwargs.get("cutoff", 0)
self._augment = kwargs.get("augment", True)
# sphinx and db clients
self._cl = kwargs.get("cl")
self._db = kwargs.get("db")
# the returning values
self.results = utils.storage(time=0, total_found=0, error="", warning="", matches=[])
开发者ID:Tankiit,项目名称:fSphinx,代码行数:34,代码来源:facets.py
示例4: test
def test():
import sys
verbose = '-v' in sys.argv
def assertEqual(a, b):
if a == b:
if verbose:
sys.stderr.write('.')
sys.stderr.flush()
else:
assert a == b, "\nexpected: %s\ngot: %s" % (repr(b), repr(a))
from utils import storage, group
t = Template
tests = [
lambda: t('1')(), '1\n',
lambda: t('$def with ()\n1')(), '1\n',
lambda: t('$def with (a)\n$a')(1), '1\n',
lambda: t('$def with (a=0)\n$a')(1), '1\n',
lambda: t('$def with (a=0)\n$a')(a=1), '1\n',
lambda: t('$if 1: 1')(), '1\n',
lambda: t('$if 1:\n 1')(), '1\n',
lambda: t('$if 0: 0\n$elif 1: 1')(), '1\n',
lambda: t('$if 0: 0\n$elif None: 0\n$else: 1')(), '1\n',
lambda: t('$if (0 < 1) and (1 < 2): 1')(), '1\n',
lambda: t('$for x in [1, 2, 3]: $x')(), '1\n2\n3\n',
lambda: t('$for x in []: 0\n$else: 1')(), '1\n',
lambda: t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3]), '1\n1\n1\n',
lambda: t('$while 0: 0\n$else: 1')(), '1\n',
lambda: t('$ a = 1\n$a')(), '1\n',
lambda: t('$# 0')(), '',
lambda: t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1}), '1\n',
lambda: t('$def with (a)\n$(a)')(1), '1\n',
lambda: t('$def with (a)\n$a')(1), '1\n',
lambda: t('$def with (a)\n$a.b')(storage(b=1)), '1\n',
lambda: t('$def with (a)\n$a[0]')([1]), '1\n',
lambda: t('${0 or 1}')(), '1\n',
lambda: t('$ a = [1]\n$a[0]')(), '1\n',
lambda: t('$ a = {1: 1}\n$a.keys()[0]')(), '1\n',
lambda: t('$ a = []\n$if not a: 1')(), '1\n',
lambda: t('$ a = {}\n$if not a: 1')(), '1\n',
lambda: t('$ a = -1\n$a')(), '-1\n',
lambda: t('$ a = "1"\n$a')(), '1\n',
lambda: t('$if 1 is 1: 1')(), '1\n',
lambda: t('$if not 0: 1')(), '1\n',
lambda: t('$if 1:\n $if 1: 1')(), '1\n',
lambda: t('$ a = 1\n$a')(), '1\n',
lambda: t('$ a = 1.\n$a')(), '1.0\n',
lambda: t('$({1: 1}.keys()[0])')(), '1\n',
lambda: t('$for x in [1, 2, 3]:\n\t$x') (), ' 1\n 2\n 3\n',
lambda: t('$def with (a)\n$:a')(1), '1\n',
]
for func, value in group(tests, 2):
assertEqual(func(), value)
j = Template("$var foo: bar")()
assertEqual(str(j), '')
assertEqual(j.foo, 'bar\n')
if verbose: sys.stderr.write('\n')
开发者ID:jcodagnone,项目名称:inmuebles,代码行数:60,代码来源:template.py
示例5: get_tokens
def get_tokens(text):
"""tokenize text using python tokenizer.
Python tokenizer ignores spaces, but they might be important in some cases.
This function introduces dummy space tokens when it identifies any ignored space.
Each token is a storage object containing type, value, begin and end.
"""
readline = iter([text]).next
end = None
for t in tokenize.generate_tokens(readline):
t = storage(type=t[0], value=t[1], begin=t[2], end=t[3])
if end is not None and end != t.begin:
_, x1 = end
_, x2 = t.begin
yield storage(type=-1, value=text[x1:x2], begin=end, end=t.begin)
end = t.end
yield t
开发者ID:dsc,项目名称:webpy,代码行数:16,代码来源:template.py
示例6: internal
def internal(*a, **kw):
web.data() # cache it
tmpctx = web._context[threading.currentThread()]
web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
def newfunc():
web._context[threading.currentThread()] = tmpctx
# Create new db cursor if there is one else background thread
# overwrites foreground cursor causing rubbish data into dbase
if web.config.get('db_parameters'):
import db
db.connect(**web.config.db_parameters)
func(*a, **kw)
myctx = web._context[threading.currentThread()]
for k in myctx.keys():
if k not in ['status', 'headers', 'output']:
try: del myctx[k]
except KeyError: pass
t = threading.Thread(target=newfunc)
background.threaddb[id(t)] = t
t.start()
web.ctx.headers = []
return seeother(changequery(_t=id(t)))
开发者ID:1ngmar,项目名称:ianki,代码行数:25,代码来源:http.py
示例7: query
def query(self, sql_query, vars=None, processed=False, _test=False):
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
out.__len__ = lambda: int(db_cursor.rowcount)
# data, fields
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
#table fields
out.fields = names
else:
out = db_cursor.rowcount
if not self.ctx.transactions:
self.ctx.commit()
return out
开发者ID:fireyy,项目名称:spyder,代码行数:33,代码来源:db.py
示例8: _SetValues
def _SetValues(self, query, sphinx_results, db):
"""Used internally to set the facet terms and additional values in this facet.
"""
# reset the facet values and stats
self.results = utils.storage(time=0, total_found=0, error="", warning="", matches=[])
# fetch the facet terms from the db
db_fetch = DBFetch(db, self._sql_query, getter=lambda x: x["attrs"]["@groupby"])
hits = db_fetch.Fetch(sphinx_results)
# let's get the stats from the results
for k in self.results.keys():
if k != "matches":
self.results[k] = hits[k]
# finally let's setup the facet values
for match in hits.matches:
# get all virtual attributes
value = dict((k, v) for k, v in match["attrs"].items() if k.startswith("@"))
# get the facet term
value["@term"] = match["@hit"][match["@hit"].keys()[-1]]
# get the value of the grouping func
value["@groupfunc"] = value.get("@groupfunc", value["@count"])
# and whether the facet has been selected
value["@selected"] = "@%s %s" % (self._sph_field, value["@term"]) in query
# append each value
self.results.matches.append(value)
开发者ID:bryanwillis1,项目名称:fSphinx,代码行数:27,代码来源:facets.py
示例9: __init__
def __init__(self, app, store, initializer=None):
self.__dict__['store'] = store
self.__dict__['_initializer'] = initializer
self.__dict__['_last_cleanup_time'] = 0
self.__dict__['_config'] = utils.storage(web.config.session_parameters)
if app:
app.add_processor(self._processor)
开发者ID:Manchester412,项目名称:socorro,代码行数:8,代码来源:session.py
示例10: __init__
def __init__(self, app, store, initializer=None):
self.store = store
self._initializer = initializer
self._last_cleanup_time = 0
self._config = utils.storage(web.config.session_parameters)
if app:
app.add_processor(self._processor)
开发者ID:desertmonad,项目名称:webpy,代码行数:8,代码来源:session.py
示例11: _FetchInternal
def _FetchInternal(self, hits):
ids = [self._getter(m) for m in hits.matches]
if ids:
s_ids = ','.join(map(str, ids))
if not self._sql:
values = (storage(id=str(id)) for id in ids)
else:
values = self._db.query(self._sql.replace('$id', s_ids))
for hit, match in zip(values, hits.matches):
match['@hit'] = hit
hits.ids = ids
开发者ID:SergeyKubrak,项目名称:fSphinx,代码行数:11,代码来源:hits.py
示例12: __init__
def __init__(self, app, store, initializer=None):
self.store = store
self._initializer = initializer
self._config = utils.storage(web.config.session_parameters)
self._data = utils.threadeddict()
self._last_cleanup_time = 0
self.__getitem__ = self._data.__getitem__
self.__setitem__ = self._data.__setitem__
self.__delitem__ = self._data.__delitem__
if app:
app.add_processor(self._processor)
开发者ID:RhoninTodo,项目名称:patrick-webpy,代码行数:13,代码来源:session.py
示例13: _FetchInternal
def _FetchInternal(self, hits):
ids = [self._getter(m) for m in hits.matches]
if ids:
s_ids = ','.join(map(str, ids))
if not self._sql:
values = (utils.storage(id=str(id)) for id in ids)
else:
values = self._db.query(self._sql.replace('$id', s_ids))
for value, match in zip(values, hits.matches):
match['@hit'] = value
for p in self._post_processors:
p(hits)
hits.ids = ids
开发者ID:abhijo89,项目名称:fSphinx,代码行数:13,代码来源:hits.py
示例14: load
def load():
"""
Loads a new context for the thread.
You can ask for a function to be run at loadtime by
adding it to the dictionary `loadhooks`.
"""
_context[threading.currentThread()] = storage()
ctx.status = '200 OK'
ctx.headers = []
if config.get('db_parameters'):
import db
db.connect(**config.db_parameters)
for x in loadhooks.values(): x()
开发者ID:stucchio,项目名称:DJ-Pirate,代码行数:15,代码来源:webapi.py
示例15: __init__
def __init__(self, app, store, initializer=None):
self.__dict__['store'] = store
self.__dict__['_initializer'] = initializer
self.__dict__['_last_cleanup_time'] = 0
# GREENPLUM - to allow multiple instances to run
if initializer.has_key('gpperfmon_instance_name'):
web.config.session_parameters['cookie_name'] = 'gpperfmon_instance_%s' % initializer['gpperfmon_instance_name']
# END GREENPLUM CHANGE
self.__dict__['_config'] = utils.storage(web.config.session_parameters)
if app:
app.add_processor(self._processor)
开发者ID:macroyuyang,项目名称:card,代码行数:15,代码来源:session.py
示例16: query
def query(self, sql_query, vars=None, processed=False, _test=False, _rawdata=False, _rawcur=False):
"""
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> db = DB(None, {})
>>> db.query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
"""
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, sql_query)
if _rawcur:
return db_cursor
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield row if _rawdata else storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [x if _rawdata else storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
if _rawdata:
out.names = names
else:
out = db_cursor.rowcount
if not self.ctx.transactions:
self.ctx.commit()
return out
开发者ID:kindy61,项目名称:webpy,代码行数:47,代码来源:db.py
示例17: rawinput
def rawinput(method=None):
"""Returns storage object with GET or POST arguments.
"""
method = method or "both"
from cStringIO import StringIO
def dictify(fs):
# hack to make web.input work with enctype='text/plain.
if fs.list is None:
fs.list = []
return dict([(k, fs[k]) for k in fs.keys()])
e = ctx.env.copy()
a = b = {}
if method.lower() in ['both', 'post', 'put']:
if e['REQUEST_METHOD'] in ['POST', 'PUT']:
if e.get('CONTENT_TYPE', '').lower().startswith('multipart/'):
# since wsgi.input is directly passed to cgi.FieldStorage,
# it can not be called multiple times. Saving the FieldStorage
# object in ctx to allow calling web.input multiple times.
a = ctx.get('_fieldstorage')
if not a:
fp = e['wsgi.input']
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
ctx._fieldstorage = a
else:
fp = StringIO(data())
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
a = dictify(a)
if method.lower() in ['both', 'get']:
e['REQUEST_METHOD'] = 'GET'
b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1))
def process_fieldstorage(fs):
if isinstance(fs, list):
return [process_fieldstorage(x) for x in fs]
elif fs.filename is None:
return fs.value
else:
return fs
return storage([(k, process_fieldstorage(v)) for k, v in dictadd(b, a).items()])
开发者ID:daimin,项目名称:tolog,代码行数:47,代码来源:webapi.py
示例18: internal
def internal(*a, **kw):
web.data() # cache it
tmpctx = web._context[threading.currentThread()]
web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
def newfunc():
web._context[threading.currentThread()] = tmpctx
func(*a, **kw)
myctx = web._context[threading.currentThread()]
for k in myctx.keys():
if k not in ['status', 'headers', 'output']:
try: del myctx[k]
except KeyError: pass
t = threading.Thread(target=newfunc)
background.threaddb[id(t)] = t
t.start()
web.ctx.headers = []
return seeother(changequery(_t=id(t)))
开发者ID:Codesleuth,项目名称:rabbitmq-dotnet-client,代码行数:20,代码来源:http.py
示例19: rawinput
def rawinput(method=None):
method = method or "both"
from cStringIO import StringIO
def dictify(fs):
if fs.list is None:
fs.list = []
return dict([(k, fs[k]) for k in fs.keys()])
e = ctx.env.copy()
a = b = {}
if method.lower() in ['both', 'post', 'put']:
if e['REQUEST_METHOD'] in ['POST', 'PUT']:
a = ctx.get('_fieldstorage')
if not a:
fp = e['wsgi.input']
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
ctx._fieldstorage = a
else:
fp = StringIO(data())
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
a = dictify(a)
if methods.lower() in ['both', 'get']:
e['REQUEST_METHOD'] = 'GET'
b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1))
def process_fieldstorage(fs):
if isinstance(fs, list):
return [process_fieldstorage(x) for x in fs]
elif fs.filename is None:
return fs.value
else:
return fs
return storage([(k, process_fieldstorage(v)) for k, v in dictadd(b, a).items()])
开发者ID:xuechao8086,项目名称:tinyhttpd,代码行数:37,代码来源:webapi.py
示例20: SessionExpired
# ==== [TW start] adding lock
import threading
# ==== [TW stop] adding lock
import utils
import webapi as web
__all__ = ["Session", "SessionExpired", "Store", "DiskStore", "DBStore"]
web.config.session_parameters = utils.storage(
{
"cookie_name": "webpy_session_id",
"cookie_domain": None,
"cookie_path": None,
"timeout": 86400, # 24 * 60 * 60, # 24 hours in seconds
"ignore_expiry": True,
"ignore_change_ip": True,
"secret_key": "fLjUfxqXtfNoIldA0A0J",
"expired_message": "Session expired",
"httponly": True,
"secure": False,
}
)
class SessionExpired(web.HTTPError):
def __init__(self, message):
web.HTTPError.__init__(self, "200 OK", {}, data=message)
class Session(object):
"""Session management for web.py
开发者ID:twatteyne,项目名称:dustlink_academy,代码行数:32,代码来源:session.py
注:本文中的utils.storage函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论