本文整理汇总了Python中sqlalchemy.schema.Table类的典型用法代码示例。如果您正苦于以下问题:Python Table类的具体用法?Python Table怎么用?Python Table使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Table类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_insert_table
def test_insert_table(engine_testaccount):
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, Sequence('user_id_seq'),
primary_key=True),
Column('name', String),
Column('fullname', String),
)
metadata.create_all(engine_testaccount)
data = [{
'id': 1,
'name': 'testname1',
'fullname': 'fulltestname1',
}, {
'id': 2,
'name': 'testname2',
'fullname': 'fulltestname2',
}]
conn = engine_testaccount.connect()
try:
# using multivalue insert
conn.execute(users.insert(data))
results = conn.execute(select([users]).order_by('id'))
row = results.fetchone()
assert row['name'] == 'testname1'
finally:
conn.close()
users.drop(engine_testaccount)
开发者ID:snowflakedb,项目名称:snowflake-sqlalchemy,代码行数:30,代码来源:test_multivalues_insert.py
示例2: upgrade
def upgrade(migrate_engine):
"""Create shares and share_access_map tables."""
meta = MetaData()
meta.bind = migrate_engine
shares = Table('shares', meta, autoload=True)
share_snapshots = Table(
'share_snapshots', meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('deleted', Boolean),
Column('id', String(length=36), primary_key=True, nullable=False),
Column('user_id', String(length=255)),
Column('project_id', String(length=255)),
Column('share_id', String(36), ForeignKey('shares.id'),
nullable=False),
Column('size', Integer),
Column('status', String(length=255)),
Column('progress', String(length=255)),
Column('display_name', String(length=255)),
Column('display_description', String(length=255)),
Column('share_size', Integer),
Column('share_proto', String(length=255)),
Column('export_location', String(255)),
mysql_engine='InnoDB')
try:
share_snapshots.create()
except Exception:
LOG.error(_("Table %r not created!"), share_snapshots)
raise
开发者ID:aostapenko,项目名称:manila,代码行数:32,代码来源:004_add_share_snapshot_table.py
示例3: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
ip_blocks = Table('ip_blocks', meta, autoload=True)
max_allocation = Column('max_allocation', Integer())
ip_blocks.create_column(max_allocation)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:005_add_ip_block_max_allocation.py
示例4: test_clone_table_adds_or_deletes_columns
def test_clone_table_adds_or_deletes_columns(self):
meta = MetaData()
meta.bind = self.engine
table = Table('dummy',
meta,
Column('id', String(36), primary_key=True,
nullable=False),
Column('A', Boolean, default=False)
)
table.create()
newcols = [
Column('B', Boolean, default=False),
Column('C', String(255), default='foobar')
]
ignorecols = [
table.c.A.name
]
new_table = migrate_utils.clone_table('new_dummy', table, meta,
newcols=newcols,
ignorecols=ignorecols)
col_names = [c.name for c in new_table.columns]
self.assertEqual(3, len(col_names))
self.assertIsNotNone(new_table.c.B)
self.assertIsNotNone(new_table.c.C)
self.assertNotIn('A', col_names)
开发者ID:aaratn,项目名称:heat,代码行数:29,代码来源:test_utils.py
示例5: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
# Load the database tables
servers_table = Table('servers', meta, autoload=True)
pool_attrib_table = Table('pool_attributes', meta, autoload=True)
# Read in all the servers to migrate to pool_attributes table
servers = select(
columns=[
servers_table.c.id,
servers_table.c.created_at,
servers_table.c.updated_at,
servers_table.c.version,
servers_table.c.name
]
).execute().fetchall()
for server in servers:
pool_attrib_table.insert().execute(
id=server.id,
created_at=server.created_at,
updated_at=server.updated_at,
version=server.version,
key='name_server',
value=server.name,
pool_id=default_pool_id
)
开发者ID:AnatolyZimin,项目名称:designate,代码行数:28,代码来源:049_migrate_servers.py
示例6: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
tasks_table = Table('tasks', meta, autoload=True)
task_info_table = Table('task_info', meta, autoload=True)
for col_name in TASKS_MIGRATE_COLUMNS:
column = Column(col_name, Text())
column.create(tasks_table)
task_info_records = task_info_table.select().execute().fetchall()
for task_info in task_info_records:
values = {
'input': task_info.input,
'result': task_info.result,
'message': task_info.message
}
tasks_table\
.update(values=values)\
.where(tasks_table.c.id == task_info.task_id)\
.execute()
drop_tables([task_info_table])
开发者ID:Web5design,项目名称:glance,代码行数:26,代码来源:032_add_task_info_table.py
示例7: test_reflect_select
def test_reflect_select(self, engine, connection):
"""reflecttable should be able to fill in a table from the name"""
one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
self.assertEqual(len(one_row_complex.c), 15)
self.assertIsInstance(one_row_complex.c.string, Column)
rows = one_row_complex.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(list(rows[0]), _ONE_ROW_COMPLEX_CONTENTS)
try:
from sqlalchemy.types import BigInteger
except ImportError:
from sqlalchemy.databases.mysql import MSBigInteger as BigInteger
# TODO some of these types could be filled in better
self.assertIsInstance(one_row_complex.c.boolean.type, types.Boolean)
self.assertIsInstance(one_row_complex.c.tinyint.type, types.Integer)
self.assertIsInstance(one_row_complex.c.smallint.type, types.Integer)
self.assertIsInstance(one_row_complex.c.int.type, types.Integer)
self.assertIsInstance(one_row_complex.c.bigint.type, BigInteger)
self.assertIsInstance(one_row_complex.c.float.type, types.Float)
self.assertIsInstance(one_row_complex.c.double.type, types.Float)
self.assertIsInstance(one_row_complex.c.string.type, types.String)
self.assertIsInstance(one_row_complex.c.timestamp.type, HiveTimestamp)
self.assertIsInstance(one_row_complex.c.binary.type, types.String)
self.assertIsInstance(one_row_complex.c.array.type, types.String)
self.assertIsInstance(one_row_complex.c.map.type, types.String)
self.assertIsInstance(one_row_complex.c.struct.type, types.String)
self.assertIsInstance(one_row_complex.c.union.type, types.String)
self.assertIsInstance(one_row_complex.c.decimal.type, HiveDecimal)
开发者ID:KMK-ONLINE,项目名称:PyHive,代码行数:30,代码来源:test_sqlalchemy_hive.py
示例8: _exclude_columns_table
def _exclude_columns_table(table):
new_table = Table(table.name, MetaData())
for c in table.columns:
if c.name not in columns:
new_table.append_column(c.copy())
return new_table
开发者ID:pwdtnx,项目名称:ouroboros,代码行数:7,代码来源:meta.py
示例9: create_table
def create_table(self, table_name, columns, ctypes, keys):
"""
This method is used to create new table in the database.
Input:
Database configuration object and table name
Output:
New table created
"""
#before creating a new table check if that table exists
self.table_name = table_name
table_flag = self.check_if_table_exists(table_name)
if table_flag:
print 'Table already exists in the database. No need to create a new table'
else:
#create a new table since it does not exist
print 'Table - %s does not exist. Create a new table' %(table_name)
#get the description of the table
table_columns = self.get_table_desc(columns, ctypes, keys)
try:
new_table = Table(
self.table_name,
self.metadata,
*table_columns
)
#create new table
new_table.create(checkfirst = True)
print "Table '%s' created"%self.table_name
except:
print 'Error while creating the table %s'%self.table_name
raise Exception
开发者ID:foss-transportationmodeling,项目名称:simtravel,代码行数:34,代码来源:database_connection.py
示例10: __init__
def __init__(self, db, schema, table, columns=None):
self.db = db
self.schema = schema
self.name = table
self.engine = create_engine(db.url)
self.metadata = MetaData(schema=schema)
self.metadata.bind = self.engine
# http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html
# if provided columns (SQLAlchemy columns), create the table
if table:
if columns:
self.table = SQLATable(
table, self.metadata, schema=self.schema, *columns
)
self.table.create()
# otherwise just load from db
else:
self.table = SQLATable(
table, self.metadata, schema=self.schema, autoload=True
)
self.indexes = dict((i.name, i) for i in self.table.indexes)
self._is_dropped = False
else:
self._is_dropped = True
self.table = None
开发者ID:smnorris,项目名称:pgdb,代码行数:25,代码来源:table.py
示例11: downgrade
def downgrade(migrate_engine):
meta.bind = migrate_engine
keys = Enum(name='key', metadata=meta, *ZONE_ATTRIBUTE_KEYS)
types = Enum(name='types', metadata=meta, *ZONE_TYPES)
domains_attributes_table = Table('domain_attributes', meta, autoload=True)
domains_table = Table('domains', meta, autoload=True)
domains = select(columns=[domains_table.c.id, domains_table.c.type])\
.where(domains_table.c.type == 'SECONDARY')\
.execute().fetchall()
for dom in domains:
delete = domains_table.delete()\
.where(domains_table.id == dom.id)
delete.execute()
domains_table.c.type.drop()
domains_table.c.transferred_at.drop()
domains_attributes_table.drop()
keys.drop()
types.drop()
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith('sqlite'):
constraint = UniqueConstraint(
'name', 'deleted', name='unique_domain_name', table=domains_table)
# Add missing unique index
constraint.create()
开发者ID:jkhelil,项目名称:designate,代码行数:32,代码来源:052_secondary_zones.py
示例12: test_reflect_select
def test_reflect_select(self, engine, connection):
"""reflecttable should be able to fill in a table from the name"""
one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
# Presto ignores the union and decimal columns
self.assertEqual(len(one_row_complex.c), 15 - 2)
self.assertIsInstance(one_row_complex.c.string, Column)
rows = one_row_complex.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(list(rows[0]), [
True,
127,
32767,
2147483647,
9223372036854775807,
0.5,
0.25,
'a string',
'1970-01-01 00:00:00.000',
'123',
[1, 2],
{"1": 2, "3": 4}, # Presto converts all keys to strings so that they're valid JSON
[1, 2], # struct is returned as a list of elements
#'{0:1}',
#0.1,
])
开发者ID:Uberi,项目名称:PyHive,代码行数:25,代码来源:test_sqlalchemy_presto.py
示例13: update_item_saved_info
def update_item_saved_info(item):
engine = get_onitsuka_db_engine()
item_owner_id = item['owner_id']
item_id = item['item_id']
user_following = Table('user_following', metaData, autoload=True, autoload_with = engine)
s = select([user_following.c.user_id], (user_following.c.following_id==item_owner_id))
result = engine.execute(s)
user_feed_update_list = list()
for follower in result:
item_owner_follower_id = follower['user_id']
print item_owner_follower_id
user_feed_update_item = {}
user_feed_update_item['user_id'] = item_owner_follower_id
user_feed_update_item['owner_id'] = item_owner_id
user_feed_update_item['item_id'] = item_id
user_feed_update_list.append(user_feed_update_item)
result.close()
user_feed_table = Table('user_feed', metaData, autoload=True, autoload_with = engine)
ins = user_feed_table.insert().values(user_id=bindparam('user_id'), owner_id=bindparam('owner_id'), item_id=bindparam('item_id'))
engine.execute(ins, user_feed_update_list)
开发者ID:timewalker00,项目名称:gotwish-onitsuka-worker,代码行数:30,代码来源:dao.py
示例14: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
ip_blocks = Table('ip_blocks', meta, autoload=True)
network_name = Column('network_name', String(255))
ip_blocks.create_column(network_name)
开发者ID:blamarvt,项目名称:melange,代码行数:7,代码来源:003_add_network_label_to_ip_blocks.py
示例15: create_table
def create_table(engine, table_name):
log.debug("Creating table: %s on %r" % (table_name, engine))
table = Table(table_name, engine._metadata)
col = Column('id', Integer, primary_key=True)
table.append_column(col)
table.create(engine)
TABLES[engine][table_name] = table
return table
开发者ID:asuffield,项目名称:sqlaload,代码行数:8,代码来源:schema.py
示例16: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
ip_addresses = Table('ip_addresses', meta, autoload=True)
allocated = Column('allocated', Boolean(), default=False)
ip_addresses.create_column(allocated)
migrate_engine.execute(ip_addresses.update().values(allocated=True))
开发者ID:blamarvt,项目名称:melange,代码行数:8,代码来源:006_single_table_allocation.py
示例17: add_aspect
def add_aspect(self, name, *columns):
table = Table(
name, self.metadata,
Column('id', types.Integer, ForeignKey("entity.id")),
*columns)
table.create()
self.aspects[name] = [x.name for x in columns]
self.aspect_names = set(self.aspects.iterkeys())
开发者ID:tomster,项目名称:stasis,代码行数:8,代码来源:entities.py
示例18: _table
def _table(table):
src_table = table
new_table = Table(src_table.name, MetaData())
for c in src_table.columns:
if c.name not in columns:
new_table.append_column(c.copy())
return new_table
开发者ID:Acidburn0zzz,项目名称:Ouroboros,代码行数:8,代码来源:meta.py
示例19: wrap
def wrap(fn):
table_definition = TableDefinition()
fn(table_definition)
table = Table(name, self.meta)
for attrname in table_definition.fields.keys():
args, kw = table_definition.fields[attrname]
table.append_column(Column(attrname, *args, **kw))
table.create()
开发者ID:vuuvv,项目名称:vweb.old,代码行数:8,代码来源:db.py
示例20: test_reflect_select
def test_reflect_select(self, engine, connection):
"""reflecttable should be able to fill in a table from the name"""
one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True)
self.assertEqual(len(one_row_complex.c), 15)
self.assertIsInstance(one_row_complex.c.string, Column)
rows = one_row_complex.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(list(rows[0]), _ONE_ROW_COMPLEX_CONTENTS)
开发者ID:ethanliou,项目名称:PyHive,代码行数:8,代码来源:test_sqlalchemy_hive.py
注:本文中的sqlalchemy.schema.Table类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论