本文整理汇总了Python中sqlalchemy.util.get_cls_kwargs函数的典型用法代码示例。如果您正苦于以下问题:Python get_cls_kwargs函数的具体用法?Python get_cls_kwargs怎么用?Python get_cls_kwargs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_cls_kwargs函数的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: create
def create(self, name_or_url, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
# get module from sqlalchemy.databases
module = u.get_module()
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(module.dialect):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
# create dialect
dialect = module.dialect(**dialect_args)
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
cparams.update(kwargs.pop('connect_args', {}))
# look for existing pool or create
pool = kwargs.pop('pool', None)
if pool is None:
dbapi = kwargs.pop('module', dialect.dbapi())
if dbapi is None:
raise exceptions.InvalidRequestError("Cant get DBAPI module for dialect '%s'" % dialect)
def connect():
try:
return dbapi.connect(*cargs, **cparams)
except Exception, e:
raise exceptions.DBAPIError("Connection failed", e)
creator = kwargs.pop('creator', connect)
poolclass = kwargs.pop('poolclass', getattr(module, 'poolclass', poollib.QueuePool))
pool_args = {}
# consume pool arguments from kwargs, translating a few of the arguments
for k in util.get_cls_kwargs(poolclass):
tk = {'echo':'echo_pool', 'timeout':'pool_timeout', 'recycle':'pool_recycle'}.get(k, k)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
pool_args['use_threadlocal'] = self.pool_threadlocal()
pool = poolclass(creator, **pool_args)
开发者ID:nakedible,项目名称:vpnease-l2tp,代码行数:43,代码来源:strategies.py
示例2: mapper
def mapper(self, *args, **kwargs):
"""return a mapper() function which associates this ScopedSession with the Mapper."""
from sqlalchemy.orm import mapper
extension_args = dict([(arg, kwargs.pop(arg)) for arg in get_cls_kwargs(_ScopedExt) if arg in kwargs])
kwargs["extension"] = extension = to_list(kwargs.get("extension", []))
if extension_args:
extension.append(self.extension.configure(**extension_args))
else:
extension.append(self.extension)
return mapper(*args, **kwargs)
开发者ID:pombredanne,项目名称:sqlalchemy-patches,代码行数:13,代码来源:scoping.py
示例3: _update_options
def _update_options(self, options):
session_options = {}
for arg in get_cls_kwargs(Session):
if arg in options:
session_options[arg] = options.pop(arg)
options.setdefault('echo', False)
options.setdefault('convert_unicode', True)
self.engine_options = options
session_options.setdefault('autoflush', True)
session_options.setdefault('autocommit', False)
session_options.setdefault('query_cls', BaseQuery)
self.session_options = session_options
开发者ID:jpscaletti,项目名称:sqlalchemy-wrapper,代码行数:15,代码来源:core.py
示例4: create
def create(self, name_or_url, executor, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
# create dialect
dialect = dialect_cls(**dialect_args)
return MockEngineStrategy.MockConnection(dialect, executor)
开发者ID:ITBuddha,项目名称:geeknote,代码行数:16,代码来源:strategies.py
示例5: poolclass
'reset_on_return':'pool_reset_on_return'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
else:
pool = pool
# create engine.
engineclass = self.engine_cls
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = kwargs.pop(k)
_initialize = kwargs.pop('_initialize', True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
"of components." % (','.join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__))
开发者ID:ITBuddha,项目名称:geeknote,代码行数:31,代码来源:strategies.py
示例6: test
def test(cls, *expected):
eq_(set(util.get_cls_kwargs(cls)), set(expected))
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:2,代码来源:test_utils.py
示例7: create
def create(self, name_or_url, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
dialect_cls = u.get_dialect()
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = kwargs.pop(k)
dbapi = kwargs.pop('module', None)
if dbapi is None:
dbapi_args = {}
for k in util.get_func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = kwargs.pop(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args['dbapi'] = dbapi
# create dialect
dialect = dialect_cls(**dialect_args)
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
cparams.update(kwargs.pop('connect_args', {}))
# look for existing pool or create
pool = kwargs.pop('pool', None)
if pool is None:
def connect():
try:
return dialect.connect(*cargs, **cparams)
except Exception as e:
invalidated = dialect.is_disconnect(e, None, None)
util.raise_from_cause(
exc.DBAPIError.instance(None, None,
e, dialect.dbapi.Error,
connection_invalidated=invalidated
)
)
creator = kwargs.pop('creator', connect)
poolclass = kwargs.pop('poolclass', None)
if poolclass is None:
poolclass = dialect_cls.get_pool_class(u)
pool_args = {}
# consume pool arguments from kwargs, translating a few of
# the arguments
translate = {'logging_name': 'pool_logging_name',
'echo': 'echo_pool',
'timeout': 'pool_timeout',
'recycle': 'pool_recycle',
'events': 'pool_events',
'use_threadlocal': 'pool_threadlocal',
'reset_on_return': 'pool_reset_on_return'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = kwargs.pop(tk)
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
else:
pool = pool
# create engine.
engineclass = self.engine_cls
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = kwargs.pop(k)
_initialize = kwargs.pop('_initialize', True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
"of components." % (','.join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
pool.__class__.__name__,
engineclass.__name__))
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
do_on_connect = dialect.on_connect()
if do_on_connect:
def on_connect(dbapi_connection, connection_record):
conn = getattr(
dbapi_connection, '_sqla_unwrap', dbapi_connection)
if conn is None:
#.........这里部分代码省略.........
开发者ID:aburan28,项目名称:sqlalchemy,代码行数:101,代码来源:strategies.py
示例8: create
def create(self, name_or_url, **kwargs):
# create url.URL object
u = url.make_url(name_or_url)
plugins = u._instantiate_plugins(kwargs)
u.query.pop('plugin', None)
entrypoint = u._get_entrypoint()
dialect_cls = entrypoint.get_dialect_cls(u)
if kwargs.pop('_coerce_config', False):
def pop_kwarg(key, default=None):
value = kwargs.pop(key, default)
if key in dialect_cls.engine_config_types:
value = dialect_cls.engine_config_types[key](value)
return value
else:
pop_kwarg = kwargs.pop
dialect_args = {}
# consume dialect arguments from kwargs
for k in util.get_cls_kwargs(dialect_cls):
if k in kwargs:
dialect_args[k] = pop_kwarg(k)
dbapi = kwargs.pop('module', None)
if dbapi is None:
dbapi_args = {}
for k in util.get_func_kwargs(dialect_cls.dbapi):
if k in kwargs:
dbapi_args[k] = pop_kwarg(k)
dbapi = dialect_cls.dbapi(**dbapi_args)
dialect_args['dbapi'] = dbapi
# create dialect
dialect = dialect_cls(**dialect_args)
# assemble connection arguments
(cargs, cparams) = dialect.create_connect_args(u)
cparams.update(pop_kwarg('connect_args', {}))
cargs = list(cargs) # allow mutability
# look for existing pool or create
pool = pop_kwarg('pool', None)
if pool is None:
def connect(connection_record=None):
if dialect._has_events:
for fn in dialect.dispatch.do_connect:
connection = fn(
dialect, connection_record, cargs, cparams)
if connection is not None:
return connection
return dialect.connect(*cargs, **cparams)
creator = pop_kwarg('creator', connect)
poolclass = pop_kwarg('poolclass', None)
if poolclass is None:
poolclass = dialect_cls.get_pool_class(u)
pool_args = {}
# consume pool arguments from kwargs, translating a few of
# the arguments
translate = {'logging_name': 'pool_logging_name',
'echo': 'echo_pool',
'timeout': 'pool_timeout',
'recycle': 'pool_recycle',
'events': 'pool_events',
'use_threadlocal': 'pool_threadlocal',
'reset_on_return': 'pool_reset_on_return'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
pool_args[k] = pop_kwarg(tk)
pool = poolclass(creator, **pool_args)
else:
if isinstance(pool, poollib._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
else:
pool = pool
# create engine.
engineclass = self.engine_cls
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
engine_args[k] = pop_kwarg(k)
_initialize = kwargs.pop('_initialize', True)
# all kwargs should be consumed
if kwargs:
raise TypeError(
"Invalid argument(s) %s sent to create_engine(), "
"using configuration %s/%s/%s. Please check that the "
"keyword arguments are appropriate for this combination "
"of components." % (','.join("'%s'" % k for k in kwargs),
dialect.__class__.__name__,
#.........这里部分代码省略.........
开发者ID:CyberCollins,项目名称:sqlalchemy,代码行数:101,代码来源:strategies.py
注:本文中的sqlalchemy.util.get_cls_kwargs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论