• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python schema.MetaData类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.schema.MetaData的典型用法代码示例。如果您正苦于以下问题:Python MetaData类的具体用法?Python MetaData怎么用?Python MetaData使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MetaData类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

class Database:
    def __init__(self, url, config=None):
        self.url = url
        self.engine = self.make_engine(url)
        self.metadata = MetaData(bind=self.engine)
        self.metadata.reflect()

        self.config = config

        # parallel table init
        self.tables_lock = threading.Lock()
        self.tables = {}
        threads = []
        for table in self.metadata.sorted_tables:
            t = threading.Thread(target=self.make_dsm_table, args=(table,))
            t.start()
            threads.append(t)
        [t.join() for t in threads]

    def execute(self, qry):
        try:
            res = self.engine.execute(qry)
        except Exception, e:
            if e.message == "(OperationalError) (1205, 'Lock wait timeout exceeded; try restarting transaction')":
                print e
                res = self.execute(qry)
            else:
                print e
                raise e

        return res
开发者ID:livingbio,项目名称:Data-Science-Machine,代码行数:31,代码来源:database.py


示例2: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    # add column:
    backups = Table('backups', meta, autoload=True)
    backups.create_column(Column('parent_id', String(36), nullable=True))
开发者ID:AlexeyDeyneko,项目名称:trove,代码行数:7,代码来源:022_add_backup_parent_id.py


示例3: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    # new table with desired columns, indexes, and constraints
    new_agent_heartbeats = Table(
        'agent_heartbeats', meta,
        Column('id', String(36), primary_key=True, nullable=False),
        Column('instance_id', String(36),
               nullable=False, unique=True, index=True),
        Column('guest_agent_version', String(255), index=True),
        Column('deleted', Boolean(), index=True),
        Column('deleted_at', DateTime()),
        Column('updated_at', DateTime(), nullable=False))

    # original table from migration 005_heartbeat.py
    previous_agent_heartbeats = Table('agent_heartbeats', meta, autoload=True)

    try:
        drop_tables([previous_agent_heartbeats])
    except OperationalError as e:
        logger.warn("This table may have been dropped by some other means.")
        logger.warn(e)

    create_tables([new_agent_heartbeats])
开发者ID:Tesora,项目名称:tesora-trove,代码行数:25,代码来源:028_recreate_agent_heartbeat.py


示例4: reflect

def reflect(engine, models, schema = None):
    metadata = MetaData()
    metadata.bind = engine

    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category = SAWarning)
        metadata.reflect(schema = schema, views = False)

    if schema is not None:
        tables = dict((table_name.replace(str(schema) + ".", ""), table)
                      for table_name, table in metadata.tables.iteritems())
    else:
        tables = metadata.tables

    clear_mappers()

    mappers = {}
    for table_name, table in tables.iteritems():
        modelname = "".join([word.capitalize() for word in table_name.split("_")])

        try:
            model = getattr(models, modelname)
        except AttributeError:
            stderr.write("Missing model for table %s\n" % table_name)
        else:
            mappers[modelname] = mapper(model, table)

    Session = sessionmaker(bind = engine, autocommit = False, autoflush = True)

    return mappers, tables, Session
开发者ID:petrushev,项目名称:ideasphere,代码行数:30,代码来源:__init__.py


示例5: reflect_table

def reflect_table(engine, klass):
    """Inspect and reflect objects"""

    try:
        meta = MetaData()
        meta.reflect(bind=engine)
    except OperationalError as e:
        raise DatabaseError(error=e.orig.args[1], code=e.orig.args[0])

    # Try to reflect from any of the supported tables
    table = None

    for tb in klass.tables():
        if tb in meta.tables:
            table = meta.tables[tb]
            break

    if table is None:
        raise DatabaseError(error="Invalid schema. Table not found",
                            code="-1")

    # Map table schema into klass
    mapper(klass, table,
           column_prefix=klass.column_prefix())

    return table
开发者ID:geekygirldawn,项目名称:sortinghat,代码行数:26,代码来源:database.py


示例6: migrate

def migrate(engine, connection, revmap):
    """Given engine, connection and revision map, go through the
    ticket descriptions and comments and migrate the svn revisions to
    git hashes.

    """
    metadata = MetaData()
    metadata.bind = engine

    tickets = Table('ticket', metadata, autoload=True)
    changes = Table('ticket_change', metadata, autoload=True)

    trans = connection.begin()
    try:

        count = migrate_table(connection, revmap,
            tickets, [tickets.c.id], 
            [tickets.c.description]
        )
        count += migrate_table(connection, revmap,
            changes, [changes.c.ticket, changes.c.time, changes.c.field], 
            [changes.c.newvalue]
        )

        trans.commit()
        
        print("Migrated %i records" % count)

    except Exception, e:
        trans.rollback()
        die("Migration error: %s" % repr(e), "Changes were rolled back")
开发者ID:seantis,项目名称:git-svn-trac,代码行数:31,代码来源:git-svn-trac.py


示例7: main

def main():
    parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.')
    parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database')
    parser.add_argument('--version', action='store_true', help="print the version number and exit")
    parser.add_argument('--schema', help='load tables from an alternate schema')
    parser.add_argument('--tables', help='tables to process (comma-separated, default: all)')
    parser.add_argument('--noviews', action='store_true', help="ignore views")
    parser.add_argument('--noindexes', action='store_true', help='ignore indexes')
    parser.add_argument('--noconstraints', action='store_true', help='ignore constraints')
    parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance")
    parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form")
    parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables")
    parser.add_argument('--alwaysclasses', action='store_true', help="always generate classes")
    parser.add_argument('--nosequences', action='store_true', help="don't auto-generate postgresql sequences")
    parser.add_argument('--outfile', help='file to write output to (default: stdout)')
    args = parser.parse_args()

    if args.version:
        print(sqlacodegen.version)
        return
    if not args.url:
        print('You must supply a url\n', file=sys.stderr)
        parser.print_help()
        return

    engine = create_engine(args.url)
    metadata = MetaData(engine)
    tables = args.tables.split(',') if args.tables else None
    metadata.reflect(engine, args.schema, not args.noviews, tables)
    outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout
    generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect,
                              args.noclasses, args.alwaysclasses, args.nosequences)
    generator.render(outfile)
开发者ID:rflynn,项目名称:sqlacodegen,代码行数:33,代码来源:main.py


示例8: upgrade

def upgrade(migrate_engine):
    """Create shares and share_access_map tables."""
    meta = MetaData()
    meta.bind = migrate_engine

    shares = Table('shares', meta, autoload=True)
    share_snapshots = Table(
        'share_snapshots', meta,
        Column('created_at', DateTime),
        Column('updated_at', DateTime),
        Column('deleted_at', DateTime),
        Column('deleted', Boolean),
        Column('id', String(length=36), primary_key=True, nullable=False),
        Column('user_id', String(length=255)),
        Column('project_id', String(length=255)),
        Column('share_id', String(36), ForeignKey('shares.id'),
               nullable=False),
        Column('size', Integer),
        Column('status', String(length=255)),
        Column('progress', String(length=255)),
        Column('display_name', String(length=255)),
        Column('display_description', String(length=255)),
        Column('share_size', Integer),
        Column('share_proto', String(length=255)),
        Column('export_location', String(255)),
        mysql_engine='InnoDB')

    try:
        share_snapshots.create()
    except Exception:
        LOG.error(_("Table %r not created!"), share_snapshots)
        raise
开发者ID:aostapenko,项目名称:manila,代码行数:32,代码来源:004_add_share_snapshot_table.py


示例9: test_clone_table_adds_or_deletes_columns

    def test_clone_table_adds_or_deletes_columns(self):
        meta = MetaData()
        meta.bind = self.engine

        table = Table('dummy',
                      meta,
                      Column('id', String(36), primary_key=True,
                             nullable=False),
                      Column('A', Boolean, default=False)
                      )
        table.create()

        newcols = [
            Column('B', Boolean, default=False),
            Column('C', String(255), default='foobar')
        ]
        ignorecols = [
            table.c.A.name
        ]
        new_table = migrate_utils.clone_table('new_dummy', table, meta,
                                              newcols=newcols,
                                              ignorecols=ignorecols)

        col_names = [c.name for c in new_table.columns]

        self.assertEqual(3, len(col_names))
        self.assertIsNotNone(new_table.c.B)
        self.assertIsNotNone(new_table.c.C)
        self.assertNotIn('A', col_names)
开发者ID:aaratn,项目名称:heat,代码行数:29,代码来源:test_utils.py


示例10: downgrade

def downgrade(migrate_engine):
    print("036 downgrade")
    meta = MetaData()
    meta.bind = migrate_engine

    tables = [define_component_config_table(meta)]
    drop_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:7,代码来源:036_modify_template_field_type.py


示例11: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    ip_blocks = Table('ip_blocks', meta, autoload=True)
    network_name = Column('network_name', String(255))
    ip_blocks.create_column(network_name)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:003_add_network_label_to_ip_blocks.py


示例12: test_insert_table

def test_insert_table(engine_testaccount):
    metadata = MetaData()
    users = Table('users', metadata,
                  Column('id', Integer, Sequence('user_id_seq'),
                         primary_key=True),
                  Column('name', String),
                  Column('fullname', String),
                  )
    metadata.create_all(engine_testaccount)

    data = [{
        'id': 1,
        'name': 'testname1',
        'fullname': 'fulltestname1',
    }, {
        'id': 2,
        'name': 'testname2',
        'fullname': 'fulltestname2',
    }]
    conn = engine_testaccount.connect()
    try:
        # using multivalue insert
        conn.execute(users.insert(data))
        results = conn.execute(select([users]).order_by('id'))
        row = results.fetchone()
        assert row['name'] == 'testname1'

    finally:
        conn.close()
        users.drop(engine_testaccount)
开发者ID:snowflakedb,项目名称:snowflake-sqlalchemy,代码行数:30,代码来源:test_multivalues_insert.py


示例13: downgrade

def downgrade(migrate_engine):
    print("042 downgrade")
    meta = MetaData()
    meta.bind = migrate_engine

    tables = [define_template_config_roles_table(meta)]
    drop_tables(tables)

    configs = Table('configs', meta, autoload=True)
    template_config_id_reserve = getattr(configs.c, 'template_config_id')
    template_config_id_reserve.alter(type=String(36))

    template_config = Table('template_config', meta, autoload=True)
    id_reserve = getattr(template_config.c, 'id')
    id_reserve.alter(type=String(36))
    name_reserve = getattr(template_config.c, 'name')
    name_reserve.alter(type=String(50))

    template_func = Table('template_func', meta, autoload=True)
    id_reserve = getattr(template_func.c, 'id')
    id_reserve.alter(type=String(36))
    name_reserve = getattr(template_func.c, 'name')
    name_reserve.alter(type=String(36))

    template_func_configs = Table('template_func_configs', meta, autoload=True)
    id_reserve = getattr(template_func_configs.c, 'func_id')
    id_reserve.alter(type=String(36))
    name_reserve = getattr(template_func_configs.c, 'config_id')
    name_reserve.alter(type=String(36))

    config_service = Table('config_service', meta, autoload=True)
    config_id_reserve = getattr(config_service.c, 'config_id')
    config_id_reserve.alter(type=String(36))
开发者ID:openstack,项目名称:daisycloud-core,代码行数:33,代码来源:042_alter_config_template.py


示例14: downgrade

def downgrade(migrate_engine):
    print("033 downgrade")
    meta = MetaData()
    meta.bind = migrate_engine

    tables = [define_neutron_backend_table(meta)]
    drop_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:7,代码来源:033_add_neutron_backend_table.py


示例15: downgrade

def downgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    tasks_table = Table('tasks', meta, autoload=True)
    task_info_table = Table('task_info', meta, autoload=True)

    for col_name in TASKS_MIGRATE_COLUMNS:
        column = Column(col_name, Text())
        column.create(tasks_table)

    task_info_records = task_info_table.select().execute().fetchall()

    for task_info in task_info_records:
        values = {
            'input': task_info.input,
            'result': task_info.result,
            'message': task_info.message
        }

        tasks_table\
            .update(values=values)\
            .where(tasks_table.c.id == task_info.task_id)\
            .execute()

    drop_tables([task_info_table])
开发者ID:Web5design,项目名称:glance,代码行数:26,代码来源:032_add_task_info_table.py


示例16: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    ip_blocks = Table('ip_blocks', meta, autoload=True)
    max_allocation = Column('max_allocation', Integer())
    ip_blocks.create_column(max_allocation)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:005_add_ip_block_max_allocation.py


示例17: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine
    instances = Table('instances', meta, autoload=True)
    service_type = Column('service_type', String(36))
    instances.create_column(service_type)
    instances.update().values({'service_type': 'mysql'}).execute()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:7,代码来源:015_add_service_type.py


示例18: upgrade

def upgrade(migrate_engine):
    print("001 upgrade")
    meta = MetaData()
    meta.bind = migrate_engine
    tables = [define_hosts_table(meta),
              define_discover_hosts_table(meta),
              define_clusters_table(meta),
              define_cluster_hosts_table(meta),
              define_networks_table(meta),
              define_ip_ranges_table(meta),
              define_host_interfaces_table(meta),
              define_config_sets_table(meta),
              define_components_table(meta),
              define_services_table(meta),
              define_roles_table(meta),
              define_host_roles_table(meta),
              define_service_roles_table(meta),
              define_config_files_table(meta),
              define_configs_table(meta),
              define_config_set_items_table(meta),
              define_config_historys_table(meta),
              define_tasks_table(meta),
              define_task_infos_table(meta),
              define_repositorys_table(meta),
              define_users_table(meta),
              define_versions_table(meta),
              define_assigned_networks_table(meta),
              define_logic_networks_table(meta),
              define_routers_table(meta),
              define_subnets_table(meta),
              define_float_ip_ranges_table(meta),
              define_dns_nameservers_table(meta),
              define_service_disks_table(meta),
              define_cinder_volumes_table(meta)]
    create_tables(tables)
开发者ID:openstack,项目名称:daisycloud-core,代码行数:35,代码来源:001_add_daisy_tables.py


示例19: copy_star_schema

 def copy_star_schema(cls, bind=None):
     m = MetaData()
     for t in cls.data_tables:
         i2b2_star.metadata.tables[t].tometadata(m)
     if bind:
         m.bind = bind
     return m
开发者ID:UTHSCSA-CIRD,项目名称:databuilder,代码行数:7,代码来源:dfbuilder.py


示例20: read_sql_table

def read_sql_table(table_name, con, index_col=None, coerce_float=True,
                   parse_dates=None, columns=None):
    """Read SQL database table into a DataFrame.

    Given a table name and an SQLAlchemy engine, returns a DataFrame.
    This function does not support DBAPI connections.

    Parameters
    ----------
    table_name : string
        Name of SQL table in database
    con : SQLAlchemy engine
        Sqlite DBAPI connection mode not supported
    index_col : string, optional
        Column to set as index
    coerce_float : boolean, default True
        Attempt to convert values to non-string, non-numeric objects (like
        decimal.Decimal) to floating point. Can result in loss of Precision.
    parse_dates : list or dict
        - List of column names to parse as dates
        - Dict of ``{column_name: format string}`` where format string is
          strftime compatible in case of parsing string times or is one of
          (D, s, ns, ms, us) in case of parsing integer timestamps
        - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
          to the keyword arguments of :func:`pandas.to_datetime`
          Especially useful with databases without native Datetime support,
          such as SQLite
    columns : list
        List of column names to select from sql table

    Returns
    -------
    DataFrame

    See also
    --------
    read_sql_query : Read SQL query into a DataFrame.
    read_sql

    """
    if not _is_sqlalchemy_engine(con):
        raise NotImplementedError("read_sql_table only supported for "
                                  "SQLAlchemy engines.")
    import sqlalchemy
    from sqlalchemy.schema import MetaData
    meta = MetaData(con)
    try:
        meta.reflect(only=[table_name])
    except sqlalchemy.exc.InvalidRequestError:
        raise ValueError("Table %s not found" % table_name)

    pandas_sql = PandasSQLAlchemy(con, meta=meta)
    table = pandas_sql.read_table(
        table_name, index_col=index_col, coerce_float=coerce_float,
        parse_dates=parse_dates, columns=columns)

    if table is not None:
        return table
    else:
        raise ValueError("Table %s not found" % table_name, con)
开发者ID:bxhunter,项目名称:pandas,代码行数:60,代码来源:sql.py



注:本文中的sqlalchemy.schema.MetaData类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python schema.Table类代码示例发布时间:2022-05-27
下一篇:
Python schema.Index类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap