本文整理汇总了Python中sqlalchemy.schema.MetaData类的典型用法代码示例。如果您正苦于以下问题:Python MetaData类的具体用法?Python MetaData怎么用?Python MetaData使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MetaData类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
class Database:
def __init__(self, url, config=None):
self.url = url
self.engine = self.make_engine(url)
self.metadata = MetaData(bind=self.engine)
self.metadata.reflect()
self.config = config
# parallel table init
self.tables_lock = threading.Lock()
self.tables = {}
threads = []
for table in self.metadata.sorted_tables:
t = threading.Thread(target=self.make_dsm_table, args=(table,))
t.start()
threads.append(t)
[t.join() for t in threads]
def execute(self, qry):
try:
res = self.engine.execute(qry)
except Exception, e:
if e.message == "(OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')":
print e
res = self.execute(qry)
else:
print e
raise e
return res
开发者ID:livingbio,项目名称:Data-Science-Machine,代码行数:31,代码来源:database.py
示例2: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# add column:
backups = Table('backups', meta, autoload=True)
backups.create_column(Column('parent_id', String(36), nullable=True))
开发者ID:AlexeyDeyneko,项目名称:trove,代码行数:7,代码来源:022_add_backup_parent_id.py
示例3: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# new table with desired columns, indexes, and constraints
new_agent_heartbeats = Table(
'agent_heartbeats', meta,
Column('id', String(36), primary_key=True, nullable=False),
Column('instance_id', String(36),
nullable=False, unique=True, index=True),
Column('guest_agent_version', String(255), index=True),
Column('deleted', Boolean(), index=True),
Column('deleted_at', DateTime()),
Column('updated_at', DateTime(), nullable=False))
# original table from migration 005_heartbeat.py
previous_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)
try:
drop_tables([previous_agent_heartbeats])
except OperationalError as e:
logger.warn("This table may have been dropped by some other means.")
logger.warn(e)
create_tables([new_agent_heartbeats])
开发者ID:Tesora,项目名称:tesora-trove,代码行数:25,代码来源:028_recreate_agent_heartbeat.py
示例4: reflect
def reflect(engine, models, schema = None):
metadata = MetaData()
metadata.bind = engine
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = SAWarning)
metadata.reflect(schema = schema, views = False)
if schema is not None:
tables = dict((table_name.replace(str(schema) + ".", ""), table)
for table_name, table in metadata.tables.iteritems())
else:
tables = metadata.tables
clear_mappers()
mappers = {}
for table_name, table in tables.iteritems():
modelname = "".join([word.capitalize() for word in table_name.split("_")])
try:
model = getattr(models, modelname)
except AttributeError:
stderr.write("Missing model for table %s\n" % table_name)
else:
mappers[modelname] = mapper(model, table)
Session = sessionmaker(bind = engine, autocommit = False, autoflush = True)
return mappers, tables, Session
开发者ID:petrushev,项目名称:ideasphere,代码行数:30,代码来源:__init__.py
示例5: reflect_table
def reflect_table(engine, klass):
"""Inspect and reflect objects"""
try:
meta = MetaData()
meta.reflect(bind=engine)
except OperationalError as e:
raise DatabaseError(error=e.orig.args[1], code=e.orig.args[0])
# Try to reflect from any of the supported tables
table = None
for tb in klass.tables():
if tb in meta.tables:
table = meta.tables[tb]
break
if table is None:
raise DatabaseError(error="Invalid schema. Table not found",
code="-1")
# Map table schema into klass
mapper(klass, table,
column_prefix=klass.column_prefix())
return table
开发者ID:geekygirldawn,项目名称:sortinghat,代码行数:26,代码来源:database.py
示例6: migrate
def migrate(engine, connection, revmap):
"""Given engine, connection and revision map, go through the
ticket descriptions and comments and migrate the svn revisions to
git hashes.
"""
metadata = MetaData()
metadata.bind = engine
tickets = Table('ticket', metadata, autoload=True)
changes = Table('ticket_change', metadata, autoload=True)
trans = connection.begin()
try:
count = migrate_table(connection, revmap,
tickets, [tickets.c.id],
[tickets.c.description]
)
count += migrate_table(connection, revmap,
changes, [changes.c.ticket, changes.c.time, changes.c.field],
[changes.c.newvalue]
)
trans.commit()
print("Migrated %i records" % count)
except Exception, e:
trans.rollback()
die("Migration error: %s" % repr(e), "Changes were rolled back")
开发者ID:seantis,项目名称:git-svn-trac,代码行数:31,代码来源:git-svn-trac.py
示例7: main
def main():
parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
parser.add_argument('--version', action='store_true', help="print the version number and exit")
parser.add_argument('--schema', help='load tables from an alternate schema')
parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
parser.add_argument('--noviews', action='store_true', help="ignore views")
parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
parser.add_argument('--alwaysclasses', action='store_true', help="always generate classes")
parser.add_argument('--nosequences', action='store_true', help="don't auto-generate postgresql sequences")
parser.add_argument('--outfile', help='file to write output to (default: stdout)')
args = parser.parse_args()
if args.version:
print(sqlacodegen.version)
return
if not args.url:
print('You must supply a url\n', file=sys.stderr)
parser.print_help()
return
engine = create_engine(args.url)
metadata = MetaData(engine)
tables = args.tables.split(',') if args.tables else None
metadata.reflect(engine, args.schema, not args.noviews, tables)
outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
args.noclasses, args.alwaysclasses, args.nosequences)
generator.render(outfile)
开发者ID:rflynn,项目名称:sqlacodegen,代码行数:33,代码来源:main.py
示例8: upgrade
def upgrade(migrate_engine):
"""Create shares and share_access_map tables."""
meta = MetaData()
meta.bind = migrate_engine
shares = Table('shares', meta, autoload=True)
share_snapshots = Table(
'share_snapshots', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', String(length=36), primary_key=True, nullable=False),
Column('user_id', String(length=255)),
Column('project_id', String(length=255)),
Column('share_id', String(36), ForeignKey('shares.id'),
nullable=False),
Column('size', Integer),
Column('status', String(length=255)),
Column('progress', String(length=255)),
Column('display_name', String(length=255)),
Column('display_description', String(length=255)),
Column('share_size', Integer),
Column('share_proto', String(length=255)),
Column('export_location', String(255)),
mysql_engine='InnoDB')
try:
share_snapshots.create()
except Exception:
LOG.error(_("Table %r not created!"), share_snapshots)
raise
开发者ID:aostapenko,项目名称:manila,代码行数:32,代码来源:004_add_share_snapshot_table.py
示例9: test_clone_table_adds_or_deletes_columns
def test_clone_table_adds_or_deletes_columns(self):
meta = MetaData()
meta.bind = self.engine
table = Table('dummy',
meta,
Column('id', String(36), primary_key=True,
nullable=False),
Column('A', Boolean, default=False)
)
table.create()
newcols = [
Column('B', Boolean, default=False),
Column('C', String(255), default='foobar')
]
ignorecols = [
table.c.A.name
]
new_table = migrate_utils.clone_table('new_dummy', table, meta,
newcols=newcols,
ignorecols=ignorecols)
col_names = [c.name for c in new_table.columns]
self.assertEqual(3, len(col_names))
self.assertIsNotNone(new_table.c.B)
self.assertIsNotNone(new_table.c.C)
self.assertNotIn('A', col_names)
开发者ID:aaratn,项目名称:heat,代码行数:29,代码来源:test_utils.py
示例10: downgrade
def downgrade(migrate_engine):
print("036 downgrade")
meta = MetaData()
meta.bind = migrate_engine
tables = [define_component_config_table(meta)]
drop_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:7,代码来源:036_modify_template_field_type.py
示例11: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
ip_blocks = Table('ip_blocks', meta, autoload=True)
network_name = Column('network_name', String(255))
ip_blocks.create_column(network_name)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:003_add_network_label_to_ip_blocks.py
示例12: test_insert_table
def test_insert_table(engine_testaccount):
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, Sequence('user_id_seq'),
primary_key=True),
Column('name', String),
Column('fullname', String),
)
metadata.create_all(engine_testaccount)
data = [{
'id': 1,
'name': 'testname1',
'fullname': 'fulltestname1',
}, {
'id': 2,
'name': 'testname2',
'fullname': 'fulltestname2',
}]
conn = engine_testaccount.connect()
try:
# using multivalue insert
conn.execute(users.insert(data))
results = conn.execute(select([users]).order_by('id'))
row = results.fetchone()
assert row['name'] == 'testname1'
finally:
conn.close()
users.drop(engine_testaccount)
开发者ID:snowflakedb,项目名称:snowflake-sqlalchemy,代码行数:30,代码来源:test_multivalues_insert.py
示例13: downgrade
def downgrade(migrate_engine):
print("042 downgrade")
meta = MetaData()
meta.bind = migrate_engine
tables = [define_template_config_roles_table(meta)]
drop_tables(tables)
configs = Table('configs', meta, autoload=True)
template_config_id_reserve = getattr(configs.c, 'template_config_id')
template_config_id_reserve.alter(type=String(36))
template_config = Table('template_config', meta, autoload=True)
id_reserve = getattr(template_config.c, 'id')
id_reserve.alter(type=String(36))
name_reserve = getattr(template_config.c, 'name')
name_reserve.alter(type=String(50))
template_func = Table('template_func', meta, autoload=True)
id_reserve = getattr(template_func.c, 'id')
id_reserve.alter(type=String(36))
name_reserve = getattr(template_func.c, 'name')
name_reserve.alter(type=String(36))
template_func_configs = Table('template_func_configs', meta, autoload=True)
id_reserve = getattr(template_func_configs.c, 'func_id')
id_reserve.alter(type=String(36))
name_reserve = getattr(template_func_configs.c, 'config_id')
name_reserve.alter(type=String(36))
config_service = Table('config_service', meta, autoload=True)
config_id_reserve = getattr(config_service.c, 'config_id')
config_id_reserve.alter(type=String(36))
开发者ID:openstack,项目名称:daisycloud-core,代码行数:33,代码来源:042_alter_config_template.py
示例14: downgrade
def downgrade(migrate_engine):
print("033 downgrade")
meta = MetaData()
meta.bind = migrate_engine
tables = [define_neutron_backend_table(meta)]
drop_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:7,代码来源:033_add_neutron_backend_table.py
示例15: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
tasks_table = Table('tasks', meta, autoload=True)
task_info_table = Table('task_info', meta, autoload=True)
for col_name in TASKS_MIGRATE_COLUMNS:
column = Column(col_name, Text())
column.create(tasks_table)
task_info_records = task_info_table.select().execute().fetchall()
for task_info in task_info_records:
values = {
'input': task_info.input,
'result': task_info.result,
'message': task_info.message
}
tasks_table\
.update(values=values)\
.where(tasks_table.c.id == task_info.task_id)\
.execute()
drop_tables([task_info_table])
开发者ID:Web5design,项目名称:glance,代码行数:26,代码来源:032_add_task_info_table.py
示例16: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
ip_blocks = Table('ip_blocks', meta, autoload=True)
max_allocation = Column('max_allocation', Integer())
ip_blocks.create_column(max_allocation)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:005_add_ip_block_max_allocation.py
示例17: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
service_type = Column('service_type', String(36))
instances.create_column(service_type)
instances.update().values({'service_type': 'mysql'}).execute()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:7,代码来源:015_add_service_type.py
示例18: upgrade
def upgrade(migrate_engine):
print("001 upgrade")
meta = MetaData()
meta.bind = migrate_engine
tables = [define_hosts_table(meta),
define_discover_hosts_table(meta),
define_clusters_table(meta),
define_cluster_hosts_table(meta),
define_networks_table(meta),
define_ip_ranges_table(meta),
define_host_interfaces_table(meta),
define_config_sets_table(meta),
define_components_table(meta),
define_services_table(meta),
define_roles_table(meta),
define_host_roles_table(meta),
define_service_roles_table(meta),
define_config_files_table(meta),
define_configs_table(meta),
define_config_set_items_table(meta),
define_config_historys_table(meta),
define_tasks_table(meta),
define_task_infos_table(meta),
define_repositorys_table(meta),
define_users_table(meta),
define_versions_table(meta),
define_assigned_networks_table(meta),
define_logic_networks_table(meta),
define_routers_table(meta),
define_subnets_table(meta),
define_float_ip_ranges_table(meta),
define_dns_nameservers_table(meta),
define_service_disks_table(meta),
define_cinder_volumes_table(meta)]
create_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:35,代码来源:001_add_daisy_tables.py
示例19: copy_star_schema
def copy_star_schema(cls, bind=None):
m = MetaData()
for t in cls.data_tables:
i2b2_star.metadata.tables[t].tometadata(m)
if bind:
m.bind = bind
return m
开发者ID:UTHSCSA-CIRD,项目名称:databuilder,代码行数:7,代码来源:dfbuilder.py
示例20: read_sql_table
def read_sql_table(table_name, con, index_col=None, coerce_float=True,
parse_dates=None, columns=None):
"""Read SQL database table into a DataFrame.
Given a table name and an SQLAlchemy engine, returns a DataFrame.
This function does not support DBAPI connections.
Parameters
----------
table_name : string
Name of SQL table in database
con : SQLAlchemy engine
Sqlite DBAPI connection mode not supported
index_col : string, optional
Column to set as index
coerce_float : boolean, default True
Attempt to convert values to non-string, non-numeric objects (like
decimal.Decimal) to floating point. Can result in loss of Precision.
parse_dates : list or dict
- List of column names to parse as dates
- Dict of ``{column_name: format string}`` where format string is
strftime compatible in case of parsing string times or is one of
(D, s, ns, ms, us) in case of parsing integer timestamps
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
to the keyword arguments of :func:`pandas.to_datetime`
Especially useful with databases without native Datetime support,
such as SQLite
columns : list
List of column names to select from sql table
Returns
-------
DataFrame
See also
--------
read_sql_query : Read SQL query into a DataFrame.
read_sql
"""
if not _is_sqlalchemy_engine(con):
raise NotImplementedError("read_sql_table only supported for "
"SQLAlchemy engines.")
import sqlalchemy
from sqlalchemy.schema import MetaData
meta = MetaData(con)
try:
meta.reflect(only=[table_name])
except sqlalchemy.exc.InvalidRequestError:
raise ValueError("Table %s not found" % table_name)
pandas_sql = PandasSQLAlchemy(con, meta=meta)
table = pandas_sql.read_table(
table_name, index_col=index_col, coerce_float=coerce_float,
parse_dates=parse_dates, columns=columns)
if table is not None:
return table
else:
raise ValueError("Table %s not found" % table_name, con)
开发者ID:bxhunter,项目名称:pandas,代码行数:60,代码来源:sql.py
注:本文中的sqlalchemy.schema.MetaData类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论