本文整理汇总了Python中sqlalchemy.dialects.registry.register函数的典型用法代码示例。如果您正苦于以下问题:Python register函数的具体用法?Python register怎么用?Python register使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了register函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_wrapper_hooks
def test_wrapper_hooks(self):
def get_dialect_cls(url):
url.drivername = "sqlite"
return url.get_dialect()
global WrapperFactory
WrapperFactory = Mock()
WrapperFactory.get_dialect_cls.side_effect = get_dialect_cls
registry.register("wrapperdialect", __name__, "WrapperFactory")
from sqlalchemy.dialects import sqlite
e = create_engine("wrapperdialect://")
eq_(e.dialect.name, "sqlite")
assert isinstance(e.dialect, sqlite.dialect)
eq_(
WrapperFactory.mock_calls,
[
call.get_dialect_cls(url.make_url("sqlite://")),
call.engine_created(e),
],
)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:25,代码来源:test_parseconnect.py
示例2: test_engine_from_config_custom
def test_engine_from_config_custom(self):
from sqlalchemy import util
tokens = __name__.split(".")
class MyDialect(MockDialect):
engine_config_types = {
"foobar": int,
"bathoho": util.bool_or_str("force"),
}
def __init__(self, foobar=None, bathoho=None, **kw):
self.foobar = foobar
self.bathoho = bathoho
global dialect
dialect = MyDialect
registry.register(
"mockdialect.barb", ".".join(tokens[0:-1]), tokens[-1]
)
config = {
"sqlalchemy.url": "mockdialect+barb://",
"sqlalchemy.foobar": "5",
"sqlalchemy.bathoho": "false",
}
e = engine_from_config(config, _initialize=False)
eq_(e.dialect.foobar, 5)
eq_(e.dialect.bathoho, False)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:29,代码来源:test_parseconnect.py
示例3: test_register_legacy
def test_register_legacy(self):
from sqlalchemy.dialects import registry
tokens = __name__.split(".")
global dialect
dialect = MockDialect
registry.register("mockdialect.foob", ".".join(tokens[0:-1]), tokens[-1])
e = create_engine("mockdialect+foob://")
assert isinstance(e.dialect, MockDialect)
开发者ID:kihon10,项目名称:sqlalchemy,代码行数:10,代码来源:test_parseconnect.py
示例4: setup_py_test
__author__ = 'kbroughton'
from sqlalchemy.dialects import registry
registry.register("vertica", "torpedo_vertica.pyodbc", "Vertica_pyodbc")
registry.register("vertica.pyodbc", "torpedo_vertica.pyodbc", "Vertica_pyodbc")
from torpedo.testing import runner
# use this in setup.py 'test_suite':
# test_suite="run_tests.setup_py_test"
def setup_py_test():
runner.setup_py_test()
if __name__ == '__main__':
runner.main()
开发者ID:darKoram,项目名称:torpedo,代码行数:16,代码来源:run_test.py
示例5:
from sqlalchemy.dialects import registry
registry.register("drill", "sqlalchemy_drill.pyodbc", "DrillDialect_pyodbc")
registry.register("drill.pyodbc", "sqlalchemy_drill.pyodbc", "DrillDialect_pyodbc")
from sqlalchemy.testing.plugin.pytestplugin import *
开发者ID:PythonicNinja,项目名称:sqlalchemy-drill,代码行数:7,代码来源:conftest.py
示例6:
from sqlalchemy.dialects import registry
registry.register("db2", "ibm_db_sa.ibm_db", "DB2Dialect_ibm_db")
registry.register("db2.ibm_db", "ibm_db_sa.ibm_db", "DB2Dialect_ibm_db")
registry.register("db2.pyodbc", "ibm_db_sa.pyodbc", "DB2Dialect_pyodbc")
registry.register("db2.zxjdbc", "ibm_db_sa.zxjdbc", "DB2Dialect_zxjdbc")
registry.register("db2.pyodbc400", "ibm_db_sa.pyodbc", "AS400Dialect_pyodbc")
registry.register("db2.zxjdbc400", "ibm_db_sa.zxjdbc", "AS400Dialect_zxjdbc")
from sqlalchemy.testing import runner
runner.main()
开发者ID:arielmakestuff,项目名称:python-ibmdbsa,代码行数:12,代码来源:run_tests.py
示例7: setup_py_test
#!/usr/bin/env python
from sqlalchemy.dialects import registry
from sqlalchemy.testing import runner
registry.register("monetdb", "sqlalchemy_monetdb.dialect", "MonetDialect")
def setup_py_test():
runner.setup_py_test()
if __name__ == '__main__':
runner.main()
开发者ID:aalexandrov,项目名称:sqlalchemy-monetdb,代码行数:15,代码来源:run_tests.py
示例8: AWVSResult
awvspy.awvsresult
~~~~~~~~~~~~~~~~~~
This module contains the awvs scan result.
"""
import sys
import logging
import hashlib
import datetime
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
from awvspy.awvsmodels import WVSScans
from awvspy.dbapi import make_session_scope,get_session
registry.register("access", "sqlalchemy_access.pyodbc", "AccessDialect_pyodbc")
registry.register("access.pyodbc", "sqlalchemy_access.pyodbc", "AccessDialect_pyodbc")
if sys.platform == 'win32':
import pyodbc
class AWVSResult(object):
def __init__(self):
pass
engine = create_engine("access+pyodbc://vulnscanresults")
session = get_session(engine=engine)
with make_session_scope(session) as session:
wvs_scans = session.query(WVSScans).all()
开发者ID:wcc526,项目名称:awvspy,代码行数:31,代码来源:awvsresult.py
示例9:
__version__ = '0.8'
from sqlalchemy.dialects import registry
registry.register("drill", "sqlalchemy_drill.sadrill", "DrillDialect_sadrill")
registry.register("drill.sadrill", "sqlalchemy_drill.sadrill", "DrillDialect_sadrill")
开发者ID:JohnOmernik,项目名称:sqlalchemy-drill,代码行数:5,代码来源:__init__.py
示例10: StrictVersion
from __future__ import absolute_import
from __future__ import unicode_literals
from distutils.version import StrictVersion
import sqlalchemy
if StrictVersion(sqlalchemy.__version__) >= StrictVersion('0.9.4'):
from sqlalchemy.dialects import registry
registry.register("hive", "pyhive.sqlalchemy_hive", "HiveDialect")
registry.register("presto", "pyhive.sqlalchemy_presto", "PrestoDialect")
from sqlalchemy.testing.plugin.pytestplugin import *
开发者ID:danieltahara,项目名称:PyHive,代码行数:12,代码来源:conftest.py
示例11: patch_all
def patch_all():
for db, drivers in bundled_drivers.items():
registry.register(db, "sqlalchemy_gevent", dialect_name(db))
for driver in drivers:
registry.register("%s.%s" % (db,driver), "sqlalchemy_gevent", dialect_name(db,driver))
开发者ID:hkwi,项目名称:sqlalchemy_gevent,代码行数:5,代码来源:sqlalchemy_gevent.py
示例12: globals
"__call__":dialect_init_wrap(tp_factory)(dialect)
})(dialect)
bundled_drivers = {
"drizzle":"mysqldb".split(),
"firebird":"kinterbasdb fdb".split(),
"mssql":"pyodbc adodbapi pymssql zxjdbc mxodbc".split(),
"mysql":"mysqldb oursql pyodbc zxjdbc mysqlconnector pymysql gaerdbms cymysql".split(),
"oracle":"cx_oracle zxjdbc".split(),
"postgresql":"psycopg2 pg8000 pypostgresql zxjdbc".split(),
"sqlite":"pysqlite".split(),
"sybase":"pysybase pyodbc".split()
}
for db, drivers in bundled_drivers.items():
try:
globals()[dialect_name(db)] = dialect_maker(db, None)
registry.register("gevent_%s" % db, "sqlalchemy_gevent", dialect_name(db))
for driver in drivers:
globals()[dialect_name(db,driver)] = dialect_maker(db, driver)
registry.register("gevent_%s.%s" % (db,driver), "sqlalchemy_gevent", dialect_name(db,driver))
except ImportError:
# drizzle was removed in sqlalchemy v1.0
pass
def patch_all():
for db, drivers in bundled_drivers.items():
registry.register(db, "sqlalchemy_gevent", dialect_name(db))
for driver in drivers:
registry.register("%s.%s" % (db,driver), "sqlalchemy_gevent", dialect_name(db,driver))
开发者ID:hkwi,项目名称:sqlalchemy_gevent,代码行数:29,代码来源:sqlalchemy_gevent.py
示例13: DialectSQLAlchUsageTest
from sqlalchemy import *
from sqlalchemy.dialects import registry
from sqlalchemy_teradata.dialect import TeradataDialect
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.plugin.pytestplugin import *
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relation, sessionmaker
registry.register("tdalchemy", "sqlalchemy_teradata.dialect", "TeradataDialect")
class DialectSQLAlchUsageTest(fixtures.TestBase):
""" This usage test is meant to serve as documentation and follows the
tutorial here: http://docs.sqlalchemy.org/en/latest/core/tutorial.html
but with the dialect being developed
"""
# Note: this test uses pytest which captures stdout by default, pass -s to disable
def setup(self):
self.dialect = TeradataDialect()
# add credentials here
passw = ""
user = ""
self.engine = create_engine('teradata://'+user+':'+passw+'@<host>:<port>')
self.conn = self.engine.connect()
# build a table with columns
self.metadata = MetaData()
self.users = Table('my_users', self.metadata,
开发者ID:RXCORE,项目名称:sqlalchemy-teradata,代码行数:31,代码来源:usage_test.py
示例14: register_fdw_support
def register_fdw_support(url):
class PGDialectFdw(url.get_dialect()):
"""A sqldialect based on psyopg2 for managing foreign tables
"""
ddl_compiler = PGDDLCompilerFdw
construct_arguments = [
(Table, {
"server": None,
"options": None
})
]
@reflection.cache
def get_primary_keys(self, connection, table_name, schema=None, **kw):
if schema is not None:
current_schema = schema
else:
current_schema = self.default_schema_name
PK_SQL = """
SELECT cu.column_name
FROM information_schema.table_constraints tc
INNER JOIN information_schema.key_column_usage cu
on cu.constraint_name = tc.constraint_name and
cu.table_name = tc.table_name and
cu.table_schema = tc.table_schema
WHERE cu.table_name = :table_name and
constraint_type = 'PRIMARY KEY'
and cu.table_schema = :schema;
"""
t = sql.text(PK_SQL, typemap={'attname': sqltypes.Unicode})
c = connection.execute(t, table_name=table_name, schema=current_schema)
primary_keys = [r[0] for r in c.fetchall()]
return primary_keys
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
if schema is not None:
current_schema = schema
else:
current_schema = self.default_schema_name
result = connection.execute(
sql.text(
"SELECT relname FROM pg_class c "
"WHERE relkind in ('r', 'f') "
"AND '%s' = (select nspname from pg_namespace n "
"where n.oid = c.relnamespace) " %
current_schema,
typemap={'relname': sqltypes.Unicode}
)
)
return [row[0] for row in result]
@reflection.cache
def get_table_oid(self, connection, table_name, schema=None, **kw):
"""Fetch the oid for schema.table_name.
Several reflection methods require the table oid. The idea for using
this method is that it can be fetched one time and cached for
subsequent calls.
"""
table_oid = None
if schema is not None:
schema_where_clause = "n.nspname = :schema"
else:
schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
query = """
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE (%s)
AND c.relname = :table_name AND c.relkind in ('r', 'v', 'f')
""" % schema_where_clause
# Since we're binding to unicode, table_name and schema_name must be
# unicode.
table_name = str(table_name)
bindparams = [sql.bindparam('table_name', type_=sqltypes.Unicode)]
if schema is not None:
schema = str(schema)
bindparams.append(sql.bindparam('schema', type_=sqltypes.Unicode))
s = sql.text(
query, bindparams=bindparams, typemap={'oid': sqltypes.Integer})
c = connection.execute(s, table_name=table_name, schema=schema)
table_oid = c.scalar()
if table_oid is None:
raise exc.NoSuchTableError(table_name)
return table_oid
@reflection.cache
def get_foreign_table_options(self, connection, pgfdw_table):
oid = self.get_table_oid(connection, pgfdw_table.name, pgfdw_table.schema)
query = """
SELECT ftoptions, srvname
FROM pg_foreign_table t inner join pg_foreign_server s
ON t.ftserver = s.oid
WHERE t.ftrelid = :oid
"""
s = sql.text(
query, bindparams=[sql.bindparam('oid', type_=sqltypes.Integer)],
#.........这里部分代码省略.........
开发者ID:quantopian,项目名称:sqlalchemy_fdw,代码行数:101,代码来源:dialect.py
示例15:
from sqlalchemy.dialects import registry
registry.register("akiban", "sqlalchemy_akiban.dialect.psycopg2", "AkibanPsycopg2Dialect")
registry.register("akiban.psycopg2", "sqlalchemy_akiban.dialect.psycopg2", "AkibanPsycopg2Dialect")
from sqlalchemy.testing import runner
runner.main()
开发者ID:zzzeek,项目名称:sqlalchemy_akiban,代码行数:8,代码来源:run_tests.py
示例16: setup_py_test
from sqlalchemy.dialects import registry
registry.register(
"postgresql", "sqlalchemy_postgresql_pg8000.pg8000",
"PGDialect_pg8000")
registry.register("postgresql.pg8000", "sqlalchemy_postgresql_pg8000.pg8000",
"PGDialect_pg8000")
from sqlalchemy.testing import runner
# use this in setup.py 'test_suite':
# test_suite="run_tests.setup_py_test"
def setup_py_test():
runner.setup_py_test()
if __name__ == '__main__':
runner.main()
开发者ID:tlocke,项目名称:sqlalchemy-postgresql-pg8000,代码行数:17,代码来源:run_tests.py
示例17:
__version__ = '0.1.8'
from sqlalchemy.dialects import registry
#registry.register("netezza", "sqlalchemy_netezza.pyodbc", "NetezzaDialect")
#registry.register("netezza.pyodbc", "sqlalchemy_netezza.pyodbc", "NetezzaDialect")
registry.register("netezza", "netezza_dialect", "NetezzaODBC")
registry.register("netezza.pyodbc", "netezza_dialect", "NetezzaODBC")
开发者ID:GeoffOs,项目名称:netezza_sqlalchemy,代码行数:9,代码来源:__init__.py
示例18: test_register_base
def test_register_base(self):
registry.register("mockdialect", __name__, "MockDialect")
e = create_engine("mockdialect://")
assert isinstance(e.dialect, MockDialect)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:5,代码来源:test_parseconnect.py
示例19: import
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import re
from sqlalchemy.dialects import registry
from sqlalchemy.dialects.postgresql.base import PGDialect
from sqlalchemy.engine.default import DefaultDialect
from sqlalchemy.types import (BOOLEAN, SMALLINT, BIGINT, TIMESTAMP, FLOAT,
DECIMAL, Integer, Float, String)
registry.register('impala', 'impala.sqlalchemy', 'ImpalaDialect')
class TINYINT(Integer):
__visit_name__ = 'TINYINT'
class INT(Integer):
__visit_name__ = 'INT'
class DOUBLE(Float):
__visit_name__ = 'DOUBLE'
class STRING(String):
开发者ID:mrocklin,项目名称:impyla,代码行数:31,代码来源:sqlalchemy.py
示例20: get_distribution
from pkg_resources import get_distribution
__version__ = get_distribution('sqlalchemy-redshift').version
from sqlalchemy.dialects import registry
registry.register("redshift", "redshift_sqlalchemy.dialect", "RedshiftDialect")
registry.register(
"redshift+psycopg2", "redshift_sqlalchemy.dialect", "RedshiftDialect"
)
开发者ID:jklukas,项目名称:redshift_sqlalchemy,代码行数:10,代码来源:__init__.py
注:本文中的sqlalchemy.dialects.registry.register函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论