本文整理汇总了Python中sqlalchemy.engine.reflection.Inspector类的典型用法代码示例。如果您正苦于以下问题:Python Inspector类的具体用法?Python Inspector怎么用?Python Inspector使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Inspector类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _test_get_foreign_keys
def _test_get_foreign_keys(self, schema=None):
meta = MetaData(testing.db)
(users, addresses) = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
try:
expected_schema = schema
# users
users_fkeys = insp.get_foreign_keys(users.name,
schema=schema)
fkey1 = users_fkeys[0]
self.assert_(fkey1['name'] is not None)
eq_(fkey1['referred_schema'], expected_schema)
eq_(fkey1['referred_table'], users.name)
eq_(fkey1['referred_columns'], ['user_id', ])
eq_(fkey1['constrained_columns'], ['parent_user_id'])
#addresses
addr_fkeys = insp.get_foreign_keys(addresses.name,
schema=schema)
fkey1 = addr_fkeys[0]
self.assert_(fkey1['name'] is not None)
eq_(fkey1['referred_schema'], expected_schema)
eq_(fkey1['referred_table'], users.name)
eq_(fkey1['referred_columns'], ['user_id', ])
eq_(fkey1['constrained_columns'], ['remote_user_id'])
finally:
addresses.drop()
users.drop()
开发者ID:clones,项目名称:sqlalchemy,代码行数:28,代码来源:test_reflection.py
示例2: find_pending
def find_pending(db):
engine = sa.create_engine(db)
inspector = Inspector(engine)
# Newer buildbot has a "buildrequest_claims" table
if "buildrequest_claims" in inspector.get_table_names():
query = sa.text("""
SELECT buildername, count(*) FROM
buildrequests WHERE
complete=0 AND
submitted_at > :yesterday AND
submitted_at < :toonew AND
(select count(*) from buildrequest_claims where brid=id) = 0
GROUP BY buildername""")
# Older buildbot doesn't
else:
query = sa.text("""
SELECT buildername, count(*) FROM
buildrequests WHERE
complete=0 AND
claimed_at=0 AND
submitted_at > :yesterday AND
submitted_at < :toonew
GROUP BY buildername""")
result = engine.execute(
query,
yesterday=time.time() - 86400,
toonew=time.time() - 60
)
retval = result.fetchall()
return retval
开发者ID:jhopkinsmoz,项目名称:build-cloud-tools,代码行数:33,代码来源:aws_watch_pending.py
示例3: _test_get_indexes
def _test_get_indexes(self, schema=None):
meta = MetaData(testing.db)
(users, addresses) = createTables(meta, schema)
meta.create_all()
createIndexes(meta.bind, schema)
try:
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = Inspector(meta.bind)
indexes = insp.get_indexes('users', schema=schema)
indexes.sort()
if testing.against('oracle'):
expected_indexes = [
{'unique': False,
'column_names': ['TEST1', 'TEST2'],
'name': 'USERS_T_IDX'}]
else:
expected_indexes = [
{'unique': False,
'column_names': ['test1', 'test2'],
'name': 'users_t_idx'}]
index_names = [d['name'] for d in indexes]
for e_index in expected_indexes:
self.assertTrue(e_index['name'] in index_names)
index = indexes[index_names.index(e_index['name'])]
for key in e_index:
self.assertEqual(e_index[key], index[key])
finally:
addresses.drop()
users.drop()
开发者ID:,项目名称:,代码行数:31,代码来源:
示例4: _test_get_foreign_keys
def _test_get_foreign_keys(self, schema=None):
meta = MetaData(testing.db)
(users, addresses) = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
try:
expected_schema = schema
if schema is None:
try:
expected_schema = meta.bind.dialect.get_default_schema_name(
meta.bind)
except NotImplementedError:
expected_schema = None
# users
users_fkeys = insp.get_foreign_keys(users.name,
schema=schema)
fkey1 = users_fkeys[0]
self.assert_(fkey1['name'] is not None)
self.assertEqual(fkey1['referred_schema'], expected_schema)
self.assertEqual(fkey1['referred_table'], users.name)
self.assertEqual(fkey1['referred_columns'], ['user_id', ])
self.assertEqual(fkey1['constrained_columns'], ['parent_user_id'])
#addresses
addr_fkeys = insp.get_foreign_keys(addresses.name,
schema=schema)
fkey1 = addr_fkeys[0]
self.assert_(fkey1['name'] is not None)
self.assertEqual(fkey1['referred_schema'], expected_schema)
self.assertEqual(fkey1['referred_table'], users.name)
self.assertEqual(fkey1['referred_columns'], ['user_id', ])
self.assertEqual(fkey1['constrained_columns'], ['remote_user_id'])
finally:
addresses.drop()
users.drop()
开发者ID:,项目名称:,代码行数:34,代码来源:
示例5: _test_selfref_fk
def _test_selfref_fk(self, recreate):
bar = Table(
'bar', self.metadata,
Column('id', Integer, primary_key=True),
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('data', String(50)),
mysql_engine='InnoDB'
)
bar.create(self.conn)
self.conn.execute(bar.insert(), {'id': 1, 'data': 'x', 'bar_id': None})
self.conn.execute(bar.insert(), {'id': 2, 'data': 'y', 'bar_id': 1})
with self.op.batch_alter_table("bar", recreate=recreate) as batch_op:
batch_op.alter_column(
'data', new_column_name='newdata', existing_type=String(50))
insp = Inspector.from_engine(self.conn)
insp = Inspector.from_engine(self.conn)
eq_(
[(key['referred_table'],
key['referred_columns'], key['constrained_columns'])
for key in insp.get_foreign_keys('bar')],
[('bar', ['id'], ['bar_id'])]
)
开发者ID:davidszotten,项目名称:alembic,代码行数:25,代码来源:test_batch.py
示例6: _test_get_table_names
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
meta = MetaData(testing.db)
(users, addresses) = createTables(meta, schema)
meta.create_all()
createViews(meta.bind, schema)
try:
insp = Inspector(meta.bind)
if table_type == 'view':
table_names = insp.get_view_names(schema)
table_names.sort()
answer = ['email_addresses_v', 'users_v']
else:
table_names = insp.get_table_names(schema,
order_by=order_by)
table_names.sort()
if order_by == 'foreign_key':
answer = ['users', 'email_addresses']
else:
answer = ['email_addresses', 'users']
self.assertEqual(table_names, answer)
finally:
dropViews(meta.bind, schema)
addresses.drop()
users.drop()
开发者ID:,项目名称:,代码行数:25,代码来源:
示例7: _expect_default
def _expect_default(self, c_expected, col, seq=None):
Table('t', self.metadata, col)
self.autogen_context.metadata = self.metadata
if seq:
seq._set_metadata(self.metadata)
self.metadata.create_all(config.db)
insp = Inspector.from_engine(config.db)
uo = ops.UpgradeOps(ops=[])
_compare_tables(
set([(None, 't')]), set([]),
insp, uo, self.autogen_context)
diffs = uo.as_diffs()
tab = diffs[0][1]
eq_(_render_server_default_for_compare(
tab.c.x.server_default, tab.c.x, self.autogen_context),
c_expected)
insp = Inspector.from_engine(config.db)
uo = ops.UpgradeOps(ops=[])
m2 = MetaData()
Table('t', m2, Column('x', BigInteger()))
self.autogen_context.metadata = m2
_compare_tables(
set([(None, 't')]), set([(None, 't')]),
insp, uo, self.autogen_context)
diffs = uo.as_diffs()
server_default = diffs[0][0][4]['existing_server_default']
eq_(_render_server_default_for_compare(
server_default, tab.c.x, self.autogen_context),
c_expected)
开发者ID:RazerM,项目名称:alembic,代码行数:35,代码来源:test_postgresql.py
示例8: _expect_default
def _expect_default(self, c_expected, col, seq=None):
Table('t', self.metadata, col)
if seq:
seq._set_metadata(self.metadata)
self.metadata.create_all(config.db)
insp = Inspector.from_engine(config.db)
diffs = []
_compare_tables(
set([(None, 't')]), set([]),
[],
insp, self.metadata, diffs, self.autogen_context)
tab = diffs[0][1]
eq_(_render_server_default_for_compare(
tab.c.x.server_default, tab.c.x, self.autogen_context),
c_expected)
insp = Inspector.from_engine(config.db)
diffs = []
m2 = MetaData()
Table('t', m2, Column('x', BigInteger()))
_compare_tables(
set([(None, 't')]), set([(None, 't')]),
[],
insp, m2, diffs, self.autogen_context)
server_default = diffs[0][0][4]['existing_server_default']
eq_(_render_server_default_for_compare(
server_default, tab.c.x, self.autogen_context),
c_expected)
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:30,代码来源:test_postgresql.py
示例9: test_autogen
def test_autogen(self):
m = sa.MetaData()
sa.Table('t', m, sa.Column('x', sa.Integer))
def process_revision_directives(context, rev, generate_revisions):
existing_upgrades = generate_revisions[0].upgrade_ops
existing_downgrades = generate_revisions[0].downgrade_ops
# model1 will run the upgrades, e.g. create the table,
# model2 will run the downgrades as upgrades, e.g. drop
# the table again
generate_revisions[:] = [
ops.MigrationScript(
util.rev_id(),
existing_upgrades,
ops.DowngradeOps(),
version_path=os.path.join(
_get_staging_directory(), "model1"),
head="[email protected]"
),
ops.MigrationScript(
util.rev_id(),
existing_downgrades,
ops.DowngradeOps(),
version_path=os.path.join(
_get_staging_directory(), "model2"),
head="[email protected]"
)
]
with self._env_fixture(process_revision_directives, m):
command.upgrade(self.cfg, "heads")
eq_(
Inspector.from_engine(self.engine).get_table_names(),
["alembic_version"]
)
command.revision(
self.cfg, message="some message",
autogenerate=True)
command.upgrade(self.cfg, "[email protected]")
eq_(
Inspector.from_engine(self.engine).get_table_names(),
["alembic_version", "t"]
)
command.upgrade(self.cfg, "[email protected]")
eq_(
Inspector.from_engine(self.engine).get_table_names(),
["alembic_version"]
)
开发者ID:graingert,项目名称:alembic,代码行数:56,代码来源:test_script_production.py
示例10: add_municipality_domain
def add_municipality_domain(context):
# Rename the columns
renames = (
('elections', 'total_municipalities', 'total_entities'),
('elections', 'counted_municipalities', 'counted_entities'),
('election_results', 'municipality_id', 'entity_id'),
('ballot_results', 'municipality_id', 'entity_id'),
)
for table, old, new in renames:
if context.has_column(table, old):
context.operations.alter_column(table, old, new_column_name=new)
# Add the new domain, see http://stackoverflow.com/a/14845740
table_names = []
inspector = Inspector(context.operations_connection)
if 'elections' in inspector.get_table_names(context.schema):
table_names.append('elections')
if 'election_compounds' in inspector.get_table_names(context.schema):
table_names.append('election_compounds')
if 'votes' in inspector.get_table_names(context.schema):
table_names.append('votes')
if 'archived_results' in inspector.get_table_names(context.schema):
table_names.append('archived_results')
old_type = Enum('federation', 'canton', name='domain_of_influence')
new_type = Enum('federation', 'canton', 'municipality',
name='domain_of_influence')
tmp_type = Enum('federation', 'canton', 'municipality',
name='_domain_of_influence')
tmp_type.create(context.operations.get_bind(), checkfirst=False)
for table_name in table_names:
context.operations.execute(
(
'ALTER TABLE {} ALTER COLUMN domain TYPE _domain_of_influence '
'USING domain::text::_domain_of_influence'
).format(table_name)
)
old_type.drop(context.operations.get_bind(), checkfirst=False)
new_type.create(context.operations.get_bind(), checkfirst=False)
for table_name in table_names:
context.operations.execute(
(
'ALTER TABLE {} ALTER COLUMN domain TYPE domain_of_influence '
'USING domain::text::domain_of_influence'
).format(table_name)
)
tmp_type.drop(context.operations.get_bind(), checkfirst=False)
开发者ID:OneGov,项目名称:onegov.ballot,代码行数:54,代码来源:upgrade.py
示例11: test_dont_reflect_autoindex
def test_dont_reflect_autoindex(self):
meta = self.metadata
Table('foo', meta, Column('bar', String, primary_key=True))
meta.create_all()
inspector = Inspector(testing.db)
eq_(inspector.get_indexes('foo'), [])
eq_(
inspector.get_indexes('foo', include_auto_indexes=True),
[{
'unique': 1,
'name': 'sqlite_autoindex_foo_1',
'column_names': ['bar']}])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:12,代码来源:test_sqlite.py
示例12: test_create_index_with_schema
def test_create_index_with_schema(self):
"""Test creation of index with explicit schema"""
meta = self.metadata
Table(
'foo', meta, Column('bar', String, index=True),
schema='main')
meta.create_all()
inspector = Inspector(testing.db)
eq_(
inspector.get_indexes('foo', schema='main'),
[{'unique': 0, 'name': u'ix_main_foo_bar',
'column_names': [u'bar']}])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:13,代码来源:test_sqlite.py
示例13: test_dont_reflect_autoindex
def test_dont_reflect_autoindex(self):
meta = MetaData(testing.db)
t = Table('foo', meta, Column('bar', String, primary_key=True))
meta.create_all()
from sqlalchemy.engine.reflection import Inspector
try:
inspector = Inspector(testing.db)
eq_(inspector.get_indexes('foo'), [])
eq_(inspector.get_indexes('foo',
include_auto_indexes=True), [{'unique': 1, 'name'
: 'sqlite_autoindex_foo_1', 'column_names': ['bar']}])
finally:
meta.drop_all()
开发者ID:Callek,项目名称:sqlalchemy,代码行数:13,代码来源:test_sqlite.py
示例14: test_dont_reflect_autoindex
def test_dont_reflect_autoindex(self):
meta = MetaData(testing.db)
t = Table("foo", meta, Column("bar", String, primary_key=True))
meta.create_all()
from sqlalchemy.engine.reflection import Inspector
try:
inspector = Inspector(testing.db)
eq_(inspector.get_indexes("foo"), [])
eq_(
inspector.get_indexes("foo", include_auto_indexes=True),
[{"unique": 1, "name": "sqlite_autoindex_foo_1", "column_names": ["bar"]}],
)
finally:
meta.drop_all()
开发者ID:niaolianyu,项目名称:sqlalchemy,代码行数:15,代码来源:test_sqlite.py
示例15: test_get_unique_constraints
def test_get_unique_constraints(self):
meta = self.metadata
Table(
'foo', meta, Column('f', Integer),
UniqueConstraint('f', name='foo_f'))
Table(
'bar', meta, Column('b', Integer),
UniqueConstraint('b', name='bar_b'),
prefixes=['TEMPORARY'])
meta.create_all()
inspector = Inspector(testing.db)
eq_(inspector.get_unique_constraints('foo'),
[{'column_names': [u'f'], 'name': u'foo_f'}])
eq_(inspector.get_unique_constraints('bar'),
[{'column_names': [u'b'], 'name': u'bar_b'}])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:15,代码来源:test_sqlite.py
示例16: handle
def handle(self, options, global_options, *args):
output_dir = os.path.join(options.output_dir, options.engine)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
engine = get_engine(options, global_options)
if not args:
print "Failed! You should pass one or more tables name."
sys.exit(1)
inspector = Inspector.from_engine(engine)
tables = get_tables(global_options.apps_dir, tables=args,
engine=options.engine, settings_file=global_options.settings,
local_settings_file=global_options.local_settings)
for tablename, t in tables.items():
if global_options.verbose:
print '[%s] Dumpping %s...' % (options.engine, tablename)
filename = os.path.join(output_dir, tablename+'.txt')
if options.text:
format = 'txt'
else:
format = None
dump_table(t, filename, engine, delimiter=options.delimiter,
format=format, encoding=options.encoding, inspector=inspector)
开发者ID:tangjn,项目名称:uliweb,代码行数:28,代码来源:commands.py
示例17: delete_all_tables
def delete_all_tables(db):
"""Drops all tables in the database"""
conn = db.engine.connect()
transaction = conn.begin()
inspector = Inspector.from_engine(db.engine)
metadata = MetaData()
all_schema_tables = get_all_tables(db)
tables = []
all_fkeys = []
for schema, schema_tables in all_schema_tables.iteritems():
for table_name in schema_tables:
fkeys = [ForeignKeyConstraint((), (), name=fk['name'])
for fk in inspector.get_foreign_keys(table_name, schema=schema)
if fk['name']]
tables.append(Table(table_name, metadata, *fkeys, schema=schema))
all_fkeys.extend(fkeys)
for fkey in all_fkeys:
conn.execute(DropConstraint(fkey))
for table in tables:
conn.execute(DropTable(table))
for schema in all_schema_tables:
if schema != 'public':
conn.execute(DropSchema(schema))
transaction.commit()
开发者ID:pferreir,项目名称:indico-backup,代码行数:26,代码来源:management.py
示例18: _compare_default
def _compare_default(self, t1, t2, col, rendered):
t1.create(self.bind, checkfirst=True)
insp = Inspector.from_engine(self.bind)
cols = insp.get_columns(t1.name)
ctx = self.autogen_context.migration_context
return ctx.impl.compare_server_default(None, col, rendered, cols[0]["default"])
开发者ID:chishaku,项目名称:alembic,代码行数:7,代码来源:test_postgresql.py
示例19: setup
def setup():
if harvest_source_table is None:
define_harvester_tables()
log.debug('Harvest tables defined in memory')
if model.package_table.exists():
if not harvest_source_table.exists():
# Create each table individually rather than
# using metadata.create_all()
harvest_source_table.create()
harvest_job_table.create()
harvest_object_table.create()
harvest_gather_error_table.create()
harvest_object_error_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
if not 'title' in [column['name'] for column in columns]:
log.debug('Harvest tables need to be updated')
migrate_v2()
else:
log.debug('Harvest table creation deferred')
开发者ID:SenseTecnic,项目名称:ckanext-harvest,代码行数:30,代码来源:__init__.py
示例20: _test_get_primary_keys
def _test_get_primary_keys(self, schema=None):
meta = MetaData(testing.db)
(users, addresses) = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
try:
users_pkeys = insp.get_primary_keys(users.name,
schema=schema)
self.assertEqual(users_pkeys, ['user_id'])
addr_pkeys = insp.get_primary_keys(addresses.name,
schema=schema)
self.assertEqual(addr_pkeys, ['address_id'])
finally:
addresses.drop()
users.drop()
开发者ID:,项目名称:,代码行数:16,代码来源:
注:本文中的sqlalchemy.engine.reflection.Inspector类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论