• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python schema.Table类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.schema.Table的典型用法代码示例。如果您正苦于以下问题:Python Table类的具体用法?Python Table怎么用?Python Table使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Table类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_insert_table

def test_insert_table(engine_testaccount):
    metadata = MetaData()
    users = Table('users', metadata,
                  Column('id', Integer, Sequence('user_id_seq'),
                         primary_key=True),
                  Column('name', String),
                  Column('fullname', String),
                  )
    metadata.create_all(engine_testaccount)

    data = [{
        'id': 1,
        'name': 'testname1',
        'fullname': 'fulltestname1',
    }, {
        'id': 2,
        'name': 'testname2',
        'fullname': 'fulltestname2',
    }]
    conn = engine_testaccount.connect()
    try:
        # using multivalue insert
        conn.execute(users.insert(data))
        results = conn.execute(select([users]).order_by('id'))
        row = results.fetchone()
        assert row['name'] == 'testname1'

    finally:
        conn.close()
        users.drop(engine_testaccount)
开发者ID:snowflakedb,项目名称:snowflake-sqlalchemy,代码行数:30,代码来源:test_multivalues_insert.py


示例2: upgrade

def upgrade(migrate_engine):
    """Create shares and share_access_map tables."""
    meta = MetaData()
    meta.bind = migrate_engine

    shares = Table('shares', meta, autoload=True)
    share_snapshots = Table(
        'share_snapshots', meta,
        Column('created_at', DateTime),
        Column('updated_at', DateTime),
        Column('deleted_at', DateTime),
        Column('deleted', Boolean),
        Column('id', String(length=36), primary_key=True, nullable=False),
        Column('user_id', String(length=255)),
        Column('project_id', String(length=255)),
        Column('share_id', String(36), ForeignKey('shares.id'),
               nullable=False),
        Column('size', Integer),
        Column('status', String(length=255)),
        Column('progress', String(length=255)),
        Column('display_name', String(length=255)),
        Column('display_description', String(length=255)),
        Column('share_size', Integer),
        Column('share_proto', String(length=255)),
        Column('export_location', String(255)),
        mysql_engine='InnoDB')

    try:
        share_snapshots.create()
    except Exception:
        LOG.error(_("Table %r not created!"), share_snapshots)
        raise
开发者ID:aostapenko,项目名称:manila,代码行数:32,代码来源:004_add_share_snapshot_table.py


示例3: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    ip_blocks = Table('ip_blocks', meta, autoload=True)
    max_allocation = Column('max_allocation', Integer())
    ip_blocks.create_column(max_allocation)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:005_add_ip_block_max_allocation.py


示例4: test_clone_table_adds_or_deletes_columns

    def test_clone_table_adds_or_deletes_columns(self):
        meta = MetaData()
        meta.bind = self.engine

        table = Table('dummy',
                      meta,
                      Column('id', String(36), primary_key=True,
                             nullable=False),
                      Column('A', Boolean, default=False)
                      )
        table.create()

        newcols = [
            Column('B', Boolean, default=False),
            Column('C', String(255), default='foobar')
        ]
        ignorecols = [
            table.c.A.name
        ]
        new_table = migrate_utils.clone_table('new_dummy', table, meta,
                                              newcols=newcols,
                                              ignorecols=ignorecols)

        col_names = [c.name for c in new_table.columns]

        self.assertEqual(3, len(col_names))
        self.assertIsNotNone(new_table.c.B)
        self.assertIsNotNone(new_table.c.C)
        self.assertNotIn('A', col_names)
开发者ID:aaratn,项目名称:heat,代码行数:29,代码来源:test_utils.py


示例5: upgrade

def upgrade(migrate_engine):
    meta.bind = migrate_engine

    # Load the database tables
    servers_table = Table('servers', meta, autoload=True)
    pool_attrib_table = Table('pool_attributes', meta, autoload=True)

    # Read in all the servers to migrate to pool_attributes table
    servers = select(
        columns=[
            servers_table.c.id,
            servers_table.c.created_at,
            servers_table.c.updated_at,
            servers_table.c.version,
            servers_table.c.name
        ]
    ).execute().fetchall()

    for server in servers:
        pool_attrib_table.insert().execute(
            id=server.id,
            created_at=server.created_at,
            updated_at=server.updated_at,
            version=server.version,
            key='name_server',
            value=server.name,
            pool_id=default_pool_id
        )
开发者ID:AnatolyZimin,项目名称:designate,代码行数:28,代码来源:049_migrate_servers.py


示例6: downgrade

def downgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    tasks_table = Table('tasks', meta, autoload=True)
    task_info_table = Table('task_info', meta, autoload=True)

    for col_name in TASKS_MIGRATE_COLUMNS:
        column = Column(col_name, Text())
        column.create(tasks_table)

    task_info_records = task_info_table.select().execute().fetchall()

    for task_info in task_info_records:
        values = {
            'input': task_info.input,
            'result': task_info.result,
            'message': task_info.message
        }

        tasks_table\
            .update(values=values)\
            .where(tasks_table.c.id == task_info.task_id)\
            .execute()

    drop_tables([task_info_table])
开发者ID:Web5design,项目名称:glance,代码行数:26,代码来源:032_add_task_info_table.py


示例7: test_reflect_select

    def test_reflect_select(self, engine, connection):
        """reflecttable should be able to fill in a table from the name"""
        one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
        self.assertEqual(len(one_row_complex.c), 15)
        self.assertIsInstance(one_row_complex.c.string, Column)
        rows = one_row_complex.select().execute().fetchall()
        self.assertEqual(len(rows), 1)
        self.assertEqual(list(rows[0]), _ONE_ROW_COMPLEX_CONTENTS)

        try:
            from sqlalchemy.types import BigInteger
        except ImportError:
            from sqlalchemy.databases.mysql import MSBigInteger as BigInteger

        # TODO some of these types could be filled in better
        self.assertIsInstance(one_row_complex.c.boolean.type, types.Boolean)
        self.assertIsInstance(one_row_complex.c.tinyint.type, types.Integer)
        self.assertIsInstance(one_row_complex.c.smallint.type, types.Integer)
        self.assertIsInstance(one_row_complex.c.int.type, types.Integer)
        self.assertIsInstance(one_row_complex.c.bigint.type, BigInteger)
        self.assertIsInstance(one_row_complex.c.float.type, types.Float)
        self.assertIsInstance(one_row_complex.c.double.type, types.Float)
        self.assertIsInstance(one_row_complex.c.string.type, types.String)
        self.assertIsInstance(one_row_complex.c.timestamp.type, HiveTimestamp)
        self.assertIsInstance(one_row_complex.c.binary.type, types.String)
        self.assertIsInstance(one_row_complex.c.array.type, types.String)
        self.assertIsInstance(one_row_complex.c.map.type, types.String)
        self.assertIsInstance(one_row_complex.c.struct.type, types.String)
        self.assertIsInstance(one_row_complex.c.union.type, types.String)
        self.assertIsInstance(one_row_complex.c.decimal.type, HiveDecimal)
开发者ID:KMK-ONLINE,项目名称:PyHive,代码行数:30,代码来源:test_sqlalchemy_hive.py


示例8: _exclude_columns_table

    def _exclude_columns_table(table):
        new_table = Table(table.name, MetaData())

        for c in table.columns:
            if c.name not in columns:
                new_table.append_column(c.copy())
        return new_table
开发者ID:pwdtnx,项目名称:ouroboros,代码行数:7,代码来源:meta.py


示例9: create_table

    def create_table(self, table_name, columns, ctypes, keys):
        """
        This method is used to create new table in the database.

        Input:
        Database configuration object and table name

        Output:
        New table created
        """
        
        #before creating a new table check if that table exists
        self.table_name = table_name
        table_flag = self.check_if_table_exists(table_name)
        if table_flag:
            print 'Table already exists in the database. No need to create a new table'
        else:
            #create a new table since it does not exist
            print 'Table - %s does not exist. Create a new table' %(table_name)
            #get the description of the table
            table_columns = self.get_table_desc(columns, ctypes, keys)
            try:
                new_table = Table(
                        self.table_name,
                        self.metadata,
                        *table_columns
                        )
                #create new table                         
                new_table.create(checkfirst = True)
                print "Table '%s' created"%self.table_name

            except:
                print 'Error while creating the table %s'%self.table_name
                raise Exception
开发者ID:foss-transportationmodeling,项目名称:simtravel,代码行数:34,代码来源:database_connection.py


示例10: __init__

 def __init__(self, db, schema, table, columns=None):
     self.db = db
     self.schema = schema
     self.name = table
     self.engine = create_engine(db.url)
     self.metadata = MetaData(schema=schema)
     self.metadata.bind = self.engine
     # http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html
     # if provided columns (SQLAlchemy columns), create the table
     if table:
         if columns:
             self.table = SQLATable(
                 table, self.metadata, schema=self.schema, *columns
             )
             self.table.create()
         # otherwise just load from db
         else:
             self.table = SQLATable(
                 table, self.metadata, schema=self.schema, autoload=True
             )
         self.indexes = dict((i.name, i) for i in self.table.indexes)
         self._is_dropped = False
     else:
         self._is_dropped = True
         self.table = None
开发者ID:smnorris,项目名称:pgdb,代码行数:25,代码来源:table.py


示例11: downgrade

def downgrade(migrate_engine):
    meta.bind = migrate_engine

    keys = Enum(name='key', metadata=meta, *ZONE_ATTRIBUTE_KEYS)
    types = Enum(name='types', metadata=meta, *ZONE_TYPES)

    domains_attributes_table = Table('domain_attributes', meta, autoload=True)
    domains_table = Table('domains', meta, autoload=True)

    domains = select(columns=[domains_table.c.id, domains_table.c.type])\
        .where(domains_table.c.type == 'SECONDARY')\
        .execute().fetchall()

    for dom in domains:
        delete = domains_table.delete()\
            .where(domains_table.id == dom.id)
        delete.execute()

    domains_table.c.type.drop()
    domains_table.c.transferred_at.drop()

    domains_attributes_table.drop()
    keys.drop()
    types.drop()

    dialect = migrate_engine.url.get_dialect().name
    if dialect.startswith('sqlite'):
        constraint = UniqueConstraint(
            'name', 'deleted', name='unique_domain_name', table=domains_table)

        # Add missing unique index
        constraint.create()
开发者ID:jkhelil,项目名称:designate,代码行数:32,代码来源:052_secondary_zones.py


示例12: test_reflect_select

 def test_reflect_select(self, engine, connection):
     """reflecttable should be able to fill in a table from the name"""
     one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
     # Presto ignores the union and decimal columns
     self.assertEqual(len(one_row_complex.c), 15 - 2)
     self.assertIsInstance(one_row_complex.c.string, Column)
     rows = one_row_complex.select().execute().fetchall()
     self.assertEqual(len(rows), 1)
     self.assertEqual(list(rows[0]), [
         True,
         127,
         32767,
         2147483647,
         9223372036854775807,
         0.5,
         0.25,
         'a string',
         '1970-01-01 00:00:00.000',
         '123',
         [1, 2],
         {"1": 2, "3": 4},  # Presto converts all keys to strings so that they're valid JSON
         [1, 2],  # struct is returned as a list of elements
         #'{0:1}',
         #0.1,
     ])
开发者ID:Uberi,项目名称:PyHive,代码行数:25,代码来源:test_sqlalchemy_presto.py


示例13: update_item_saved_info

def update_item_saved_info(item):
    
        
    engine = get_onitsuka_db_engine()
    
    item_owner_id = item['owner_id']
    item_id = item['item_id']
    
    user_following = Table('user_following', metaData, autoload=True, autoload_with = engine)
    s = select([user_following.c.user_id], (user_following.c.following_id==item_owner_id))
    
    result = engine.execute(s)
    
    user_feed_update_list = list()
    for follower in result:
        
        item_owner_follower_id = follower['user_id']
        print item_owner_follower_id
        
        user_feed_update_item = {}
        user_feed_update_item['user_id']  = item_owner_follower_id
        user_feed_update_item['owner_id'] = item_owner_id
        user_feed_update_item['item_id'] = item_id
        user_feed_update_list.append(user_feed_update_item)

    result.close()

    user_feed_table = Table('user_feed', metaData, autoload=True, autoload_with = engine)
    ins = user_feed_table.insert().values(user_id=bindparam('user_id'), owner_id=bindparam('owner_id'), item_id=bindparam('item_id'))
    engine.execute(ins, user_feed_update_list)
开发者ID:timewalker00,项目名称:gotwish-onitsuka-worker,代码行数:30,代码来源:dao.py


示例14: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    ip_blocks = Table('ip_blocks', meta, autoload=True)
    network_name = Column('network_name', String(255))
    ip_blocks.create_column(network_name)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:003_add_network_label_to_ip_blocks.py


示例15: create_table

def create_table(engine, table_name):
    log.debug("Creating table: %s on %r" % (table_name, engine))
    table = Table(table_name, engine._metadata)
    col = Column('id', Integer, primary_key=True)
    table.append_column(col)
    table.create(engine)
    TABLES[engine][table_name] = table
    return table
开发者ID:asuffield,项目名称:sqlaload,代码行数:8,代码来源:schema.py


示例16: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    ip_addresses = Table('ip_addresses', meta, autoload=True)
    allocated = Column('allocated', Boolean(), default=False)
    ip_addresses.create_column(allocated)
    migrate_engine.execute(ip_addresses.update().values(allocated=True))
开发者ID:blamarvt,项目名称:melange,代码行数:8,代码来源:006_single_table_allocation.py


示例17: add_aspect

 def add_aspect(self, name, *columns):
     table = Table(
         name, self.metadata,
         Column('id', types.Integer, ForeignKey("entity.id")),
         *columns)
     table.create()
     self.aspects[name] = [x.name for x in columns]
     self.aspect_names = set(self.aspects.iterkeys())
开发者ID:tomster,项目名称:stasis,代码行数:8,代码来源:entities.py


示例18: _table

        def _table(table):
            src_table = table
            new_table = Table(src_table.name, MetaData())

            for c in src_table.columns:
                if c.name not in columns:
                    new_table.append_column(c.copy())
            return new_table
开发者ID:Acidburn0zzz,项目名称:Ouroboros,代码行数:8,代码来源:meta.py


示例19: wrap

		def wrap(fn):
			table_definition = TableDefinition()
			fn(table_definition)
			table = Table(name, self.meta)
			for attrname in table_definition.fields.keys():
				args, kw = table_definition.fields[attrname]
				table.append_column(Column(attrname, *args, **kw))
			table.create()
开发者ID:vuuvv,项目名称:vweb.old,代码行数:8,代码来源:db.py


示例20: test_reflect_select

 def test_reflect_select(self, engine, connection):
     """reflecttable should be able to fill in a table from the name"""
     one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
     self.assertEqual(len(one_row_complex.c), 15)
     self.assertIsInstance(one_row_complex.c.string, Column)
     rows = one_row_complex.select().execute().fetchall()
     self.assertEqual(len(rows), 1)
     self.assertEqual(list(rows[0]), _ONE_ROW_COMPLEX_CONTENTS)
开发者ID:ethanliou,项目名称:PyHive,代码行数:8,代码来源:test_sqlalchemy_hive.py



注:本文中的sqlalchemy.schema.Table类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql._sql函数代码示例发布时间:2022-05-27
下一篇:
Python schema.MetaData类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap