本文整理汇总了Python中sqlalchemy.MetaData类的典型用法代码示例。如果您正苦于以下问题:Python MetaData类的具体用法?Python MetaData怎么用?Python MetaData使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MetaData类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
snapshots = Table('snapshots', meta, autoload=True)
# New table
snapshot_metadata = Table(
'snapshot_metadata', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', Integer, primary_key=True, nullable=False),
Column('snapshot_id', String(length=36), ForeignKey('snapshots.id'),
nullable=False),
Column('key', String(length=255)),
Column('value', String(length=255)),
mysql_engine='InnoDB'
)
try:
snapshot_metadata.create()
except Exception:
LOG.error(_("Table |%s| not created!"), repr(snapshot_metadata))
raise
开发者ID:adkerr,项目名称:cinder,代码行数:26,代码来源:009_add_snapshot_metadata_table.py
示例2: drop_unique_constraint
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""Drop unique constraint from table.
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
type with NullType in metadata. We process these columns and replace
NullType with the correct column type.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constraint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constraint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
if migrate_engine.name == "sqlite":
override_cols = [
_get_not_supported_column(col_name_col_instance, col.name)
for col in t.columns
if isinstance(col.type, NullType)
]
for col in override_cols:
t.columns.replace(col)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
开发者ID:n-nishida,项目名称:rack,代码行数:35,代码来源:utils.py
示例3: drop_unique_constraint
def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
**col_name_col_instance):
"""
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" constuction. In
sqlite is only one way to drop UC:
1) Create new table with same columns, indexes and constraints
(except one that we want to drop).
2) Copy data from old table to new.
3) Drop old table.
4) Rename new table to the name of old table.
:param migrate_engine: sqlalchemy engine
:param table_name: name of table that contains uniq constarint.
:param uc_name: name of uniq constraint that will be dropped.
:param columns: columns that are in uniq constarint.
:param col_name_col_instance: contains pair column_name=column_instance.
column_instance is instance of Column. These params
are required only for columns that have unsupported
types by sqlite. For example BigInteger.
"""
if migrate_engine.name in ["mysql", "postgresql"]:
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
uc = UniqueConstraint(*columns, table=t, name=uc_name)
uc.drop()
else:
_drop_unique_constraint_in_sqlite(migrate_engine, table_name, uc_name,
**col_name_col_instance)
开发者ID:faye89,项目名称:nova,代码行数:30,代码来源:utils.py
示例4: InvalidateDuringResultTest
class InvalidateDuringResultTest(fixtures.TestBase):
__backend__ = True
def setup(self):
self.engine = engines.reconnecting_engine()
self.meta = MetaData(self.engine)
table = Table(
'sometable', self.meta,
Column('id', Integer, primary_key=True),
Column('name', String(50)))
self.meta.create_all()
table.insert().execute(
[{'id': i, 'name': 'row %d' % i} for i in range(1, 100)]
)
def teardown(self):
self.meta.drop_all()
self.engine.dispose()
@testing.fails_if([
'+mysqlconnector', '+mysqldb', '+cymysql', '+pymysql', '+pg8000'],
"Buffers the result set and doesn't check for connection close")
def test_invalidate_on_results(self):
conn = self.engine.connect()
result = conn.execute('select * from sometable')
for x in range(20):
result.fetchone()
self.engine.test_shutdown()
_assert_invalidated(result.fetchone)
assert conn.invalidated
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:30,代码来源:test_reconnect.py
示例5: setup_class
def setup_class(cls):
global metadata, cattable, matchtable
metadata = MetaData(testing.db)
testing.db.execute("""
CREATE VIRTUAL TABLE cattable using FTS3 (
id INTEGER NOT NULL,
description VARCHAR(50),
PRIMARY KEY (id)
)
""")
cattable = Table('cattable', metadata, autoload=True)
testing.db.execute("""
CREATE VIRTUAL TABLE matchtable using FTS3 (
id INTEGER NOT NULL,
title VARCHAR(200),
category_id INTEGER NOT NULL,
PRIMARY KEY (id)
)
""")
matchtable = Table('matchtable', metadata, autoload=True)
metadata.create_all()
cattable.insert().execute([{'id': 1, 'description': 'Python'},
{'id': 2, 'description': 'Ruby'}])
matchtable.insert().execute([{'id': 1, 'title'
: 'Agile Web Development with Rails'
, 'category_id': 2}, {'id': 2,
'title': 'Dive Into Python',
'category_id': 1}, {'id': 3, 'title'
: "Programming Matz's Ruby",
'category_id': 2}, {'id': 4, 'title'
: 'The Definitive Guide to Django',
'category_id': 1}, {'id': 5, 'title'
: 'Python in a Nutshell',
'category_id': 1}])
开发者ID:Callek,项目名称:sqlalchemy,代码行数:34,代码来源:test_sqlite.py
示例6: upgrade
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta = MetaData()
meta.bind = migrate_engine
# load tables for fk
instances = Table('instances', meta, autoload=True)
#
# New Tables
#
migrations = Table('migrations', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('source_compute', String(255)),
Column('dest_compute', String(255)),
Column('dest_host', String(255)),
Column('instance_id', Integer, ForeignKey('instances.id'),
nullable=True),
Column('status', String(255)),
)
for table in (migrations, ):
try:
table.create()
except Exception:
LOG.info(repr(table))
LOG.exception('Exception while creating table')
raise
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:33,代码来源:009_add_instance_migrations.py
示例7: check_shadow_table
def check_shadow_table(migrate_engine, table_name):
"""This method checks that table with ``table_name`` and
corresponding shadow table have same columns.
"""
meta = MetaData()
meta.bind = migrate_engine
table = Table(table_name, meta, autoload=True)
shadow_table = Table(db._SHADOW_TABLE_PREFIX + table_name, meta,
autoload=True)
columns = {c.name: c for c in table.columns}
shadow_columns = {c.name: c for c in shadow_table.columns}
for name, column in columns.iteritems():
if name not in shadow_columns:
raise exception.NovaException(
_("Missing column %(table)s.%(column)s in shadow table")
% {'column': name, 'table': shadow_table.name})
shadow_column = shadow_columns[name]
if not isinstance(shadow_column.type, type(column.type)):
raise exception.NovaException(
_("Different types in %(table)s.%(column)s and shadow table: "
"%(c_type)s %(shadow_c_type)s")
% {'column': name, 'table': table.name,
'c_type': column.type,
'shadow_c_type': shadow_column.type})
for name, column in shadow_columns.iteritems():
if name not in columns:
raise exception.NovaException(
_("Extra column %(table)s.%(column)s in shadow table")
% {'column': name, 'table': shadow_table.name})
return True
开发者ID:Dynavisor,项目名称:nova,代码行数:35,代码来源:utils.py
示例8: upgrade
def upgrade(migrate_engine):
"""Add workers table."""
meta = MetaData()
meta.bind = migrate_engine
workers = Table(
'workers', meta,
# Inherited fields from CinderBase
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(), default=False),
# Workers table specific fields
Column('id', Integer, primary_key=True),
Column('resource_type', String(40), nullable=False),
Column('resource_id', String(36), nullable=False),
Column('status', String(255), nullable=False),
Column('service_id', Integer, nullable=True),
UniqueConstraint('resource_type', 'resource_id'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
workers.create()
services = Table('services', meta, autoload=True)
ForeignKeyConstraint(
columns=[workers.c.service_id],
refcolumns=[services.c.id]).create()
开发者ID:C2python,项目名称:cinder,代码行数:32,代码来源:076_add_workers_table.py
示例9: test_use_alter
def test_use_alter(self):
m = MetaData()
Table('t', m,
Column('a', Integer),
)
Table('t2', m,
Column('a', Integer, ForeignKey('t.a', use_alter=True,
name='fk_ta')),
Column('b', Integer, ForeignKey('t.a', name='fk_tb'))
)
e = engines.mock_engine(dialect_name='postgresql')
m.create_all(e)
m.drop_all(e)
e.assert_sql([
'CREATE TABLE t (a INTEGER)',
'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb '
'FOREIGN KEY(b) REFERENCES t (a))',
'ALTER TABLE t2 '
'ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
'ALTER TABLE t2 DROP CONSTRAINT fk_ta',
'DROP TABLE t2',
'DROP TABLE t'
])
开发者ID:Daniel-B-Smith,项目名称:sqlalchemy,代码行数:26,代码来源:test_constraints.py
示例10: init_db
def init_db():
# create a new metadata object
metadata = MetaData()
# build our tweets table
tweets = Table('deals', metadata,
Column('deal_id', Integer(), primary_key=True),
Column('price', String(15)),
Column('url', String(255)),
Column('description', String(255)),
Column('tweet_id', BigInteger()),
Column('analyzed', DateTime(), default=datetime.now),
Column('updated', DateTime(), default=datetime.now, onupdate=datetime.now)
)
# build our matches table
price_check_history = Table('price_check_history', metadata,
Column('match_id', Integer(), primary_key=True),
Column('merchant', String(75)),
Column('url', String(255)),
Column('merchant_description', String(255)),
Column('tweet_id', BigInteger()),
Column('merchant_price', BigInteger()),
Column('analyzed', DateTime(), default=datetime.now),
Column('updated', DateTime(), default=datetime.now, onupdate=datetime.now)
)
# now make a new file based SQLite3 db
engine = create_engine('sqlite:///tweet.db',encoding='latin-1')
# and build it
metadata.create_all(engine)
# return the handle so we can talk to it
return engine, tweets, price_check_history
开发者ID:mthmn20,项目名称:personal_budget,代码行数:30,代码来源:batchie.py
示例11: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
virtual_interfaces = Table('virtual_interfaces', meta, autoload=True)
virtual_interfaces.drop_column('uuid')
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:7,代码来源:038_add_uuid_to_virtual_interfaces.py
示例12: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
storage_pools = Table('storage_pools', meta, autoload=True)
storage_pools.drop_column('cache_mode')
开发者ID:01org,项目名称:virtual-storage-manager,代码行数:7,代码来源:031_add_cache_mode_to_storage_pools.py
示例13: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
columns = [
(('created_at', DateTime), {}),
(('updated_at', DateTime), {}),
(('deleted_at', DateTime), {}),
(('deleted', Integer), {}),
(('id', Integer), dict(primary_key=True, nullable=False)),
(('instance_uuid', String(length=36)), dict(nullable=False)),
(('source_host', String(length=255)), dict(nullable=True)),
(('dest_host', String(length=255)), dict(nullable=True)),
(('dest_addr', String(length=255)), dict(nullable=True)),
(('block_migration', Boolean), dict(nullable=True, default=False)),
(('migrate_data', Text), dict(nullable=True)),
]
for prefix in ('', 'shadow_'):
basename = prefix + 'huawei_live_migrations'
if migrate_engine.has_table(basename):
continue
_columns = tuple([Column(*args, **kwargs)
for args, kwargs in columns])
table = Table(basename, meta, *_columns, mysql_engine='InnoDB',
mysql_charset='utf8')
table.create()
开发者ID:HybridCloud-dew,项目名称:hws,代码行数:27,代码来源:256_add_huawei_livemigrations.py
示例14: upgrade
def upgrade(migrate_engine):
"""Add backup_metadata table."""
meta = MetaData()
meta.bind = migrate_engine
Table('backups', meta, autoload=True)
backup_metadata = Table(
'backup_metadata', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(), default=False),
Column('id', Integer, primary_key=True, nullable=False),
Column('backup_id', String(36),
ForeignKey('backups.id'),
nullable=False),
Column('key', String(255)),
Column('value', String(255)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
backup_metadata.create()
if not utils.index_exists_on_columns(migrate_engine,
'backup_metadata',
['backup_id']):
utils.add_index(migrate_engine,
'backup_metadata',
'backup_metadata_backup_id_idx',
['backup_id'])
开发者ID:ebalduf,项目名称:cinder-backports,代码行数:33,代码来源:105_add_backup_metadata.py
示例15: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
shadow_table = Table('shadow_instances', meta, autoload=True)
locked_by_column = getattr(shadow_table.c, 'locked_by')
if str(locked_by_column.type).__contains__("SHADOW_INSTANCES0LOCKED_BY"):
LOG.info("the shadow instance table need to convert.")
shadow_table.drop()
table = Table('instances', meta, autoload=True)
columns = []
for column in table.columns:
if column.name == 'locked_by':
enum = Enum('owner', 'admin',
name='instances0locked_by'.upper())
column_copy = Column(column.name, enum)
else:
column_copy = column.copy()
columns.append(column_copy)
shadow_table_name = 'shadow_instances'
shadow_table = Table(shadow_table_name, meta, *columns,
mysql_engine='InnoDB', extend_existing=True)
shadow_table.create(checkfirst=True)
else:
LOG.info("the shadow instance table don't need to convert.")
开发者ID:HybridCloud-dew,项目名称:hws,代码行数:25,代码来源:259_bug_fix_shadow_instances_locked_by.py
示例16: test_clauseelement
def test_clauseelement(self):
metadata = MetaData()
table = Table('test_table', metadata,
Column('foo', Integer))
metadata.create_all(bind=testing.db)
try:
for elem in [
table.select,
lambda **kwargs: sa.func.current_timestamp(**kwargs).select(),
# func.current_timestamp().select,
lambda **kwargs:text("select * from test_table", **kwargs)
]:
for bind in (
testing.db,
testing.db.connect()
):
try:
e = elem(bind=bind)
assert e.bind is bind
e.execute().close()
finally:
if isinstance(bind, engine.Connection):
bind.close()
e = elem()
assert e.bind is None
assert_raises(
exc.UnboundExecutionError,
e.execute
)
finally:
if isinstance(bind, engine.Connection):
bind.close()
metadata.drop_all(bind=testing.db)
开发者ID:23andMe,项目名称:sqlalchemy,代码行数:34,代码来源:test_bind.py
示例17: test_unicode_warnings
def test_unicode_warnings(self):
metadata = MetaData(self.engine)
table1 = Table(
"mytable",
metadata,
Column(
"col1",
Integer,
primary_key=True,
test_needs_autoincrement=True,
),
Column("col2", Unicode(30)),
)
metadata.create_all()
i = [1]
# the times here is cranked way up so that we can see
# pysqlite clearing out its internal buffer and allow
# the test to pass
@testing.emits_warning()
@profile_memory()
def go():
# execute with a non-unicode object. a warning is emitted,
# this warning shouldn't clog up memory.
self.engine.execute(
table1.select().where(table1.c.col2 == "foo%d" % i[0])
)
i[0] += 1
try:
go()
finally:
metadata.drop_all()
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:35,代码来源:test_memusage.py
示例18: create_base_tables
def create_base_tables(self):
metadata = MetaData(bind=self.create_engine())
Table('random_quote', metadata,
Column('id', Integer, primary_key=True, nullable=False, autoincrement=True),
Column('quote', Text, nullable=False)
)
Table('exploit_type', metadata,
Column('id', Integer, primary_key=True, nullable=False, autoincrement=True),
Column('name', String(128), nullable=False),
Column('short_name', String(32), nullable=False)
)
Table('exploit', metadata,
Column('id', Integer, primary_key=True, nullable=False, autoincrement=True),
Column('type_id', Integer, ForeignKey(ExploitType.id), nullable=False),
Column('validator_id', Integer, nullable=False),
Column('name', String(128), nullable=False),
Column('version', String(64), nullable=False),
Column('url', String(128), nullable=False),
Column('request_method', String(12), nullable=False, default='GET'),
Column('exploit_url', String(128), nullable=False),
Column('exploit_body', Text, nullable=True),
Column('exploit_headers', Text, nullable=True),
Column('is_url_encode', Boolean, nullable=False, default=False),
Column('is_authenticated', Boolean, nullable=False, default=False)
)
metadata.create_all(checkfirst=True)
开发者ID:cinno,项目名称:vulnpress,代码行数:28,代码来源:db.py
示例19: populate_main_sql_testdatabase
def populate_main_sql_testdatabase(engine):
meta = MetaData()
table = Table('events', meta,
Column('id', Integer, primary_key=True, ),
Column('time', String(30)),
Column('source_ip', String(30)),
Column('source_port', String(30)),
Column('request_url', String(500)),
Column('request_raw', String(65536)),
Column('pattern', String(20)),
Column('filename', String(500)),
)
meta.create_all(engine)
insert_dicts = []
data = open(os.path.join(file_dir, 'data/events_500.bson'), 'r').read()
for item in bson.decode_all(data):
new_item = {"source_ip": item["source_ip"],
"source_port": item["source_port"],
"request_url": item["request"]["url"],
"pattern": item["pattern"]}
insert_dicts.append(new_item)
conn = engine.connect()
print "Inserted: {0}".format(len(insert_dicts))
conn.execute(table.insert(), insert_dicts)
开发者ID:marksee,项目名称:glastopf,代码行数:29,代码来源:helpers.py
示例20: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
volumes = Table('volumes', meta, autoload=True)
# New table
transfers = Table(
'transfers', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean),
Column('id', String(36), primary_key=True, nullable=False),
Column('volume_id', String(length=36), ForeignKey('volumes.id'),
nullable=False),
Column('display_name', String(length=255)),
Column('salt', String(length=255)),
Column('crypt_hash', String(length=255)),
Column('expires_at', DateTime(timezone=False)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
try:
transfers.create()
except Exception:
LOG.error(_("Table |%s| not created!"), repr(transfers))
raise
开发者ID:KumaHo,项目名称:cinder,代码行数:29,代码来源:010_add_transfers_table.py
注:本文中的sqlalchemy.MetaData类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论