本文整理汇总了Python中sqlalchemy.Table类的典型用法代码示例。如果您正苦于以下问题:Python Table类的具体用法?Python Table怎么用?Python Table使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Table类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: store_crop_calendar
def store_crop_calendar(engine, fname_HDFstore, args):
"""retrieves the CROP_CALENDAR table from a CGMS12 database.
if the --crop_no option is used only the records for the given crop_no
will be retrieved.
"""
meta = MetaData(engine)
tbl_cal = Table("crop_calendar", meta, autoload=True)
# retrieve distinct crop types from DB table
s = sa.select([tbl_cal.c.crop_no]).distinct()
crops = [row[0] for row in s.execute()]
# Check if only one crop type should be selected from DB
if args.crop_no is not None:
if args.crop_no not in crops:
print("Crop ID specified with --cropno (%s) not found in CROP_CALENDAR table! Returning..." % args.crop_no)
sys.exit()
crops = [args.crop_no]
# Start pulling crop_calendar data from DB
dataset_name = "/crop_calendar"
with pd.io.pytables.HDFStore(fname_HDFstore) as store:
for crop in crops:
print("Storing crop_calendar for crop %i" % crop)
s = tbl_cal.select().where(tbl_cal.c.crop_no==crop)
df_cal = pd.read_sql(s, engine)
if dataset_name in store:
store.append(dataset_name, df_cal, data_columns=["grid_no", "crop_no", "year"])
else:
store.put(dataset_name, df_cal, format="table", data_columns=["grid_no", "crop_no", "year"])
开发者ID:ritviksahajpal,项目名称:pcse,代码行数:31,代码来源:cgms2hdfstore.py
示例2: dump_table
def dump_table(table, filename, con, std=None, delimiter=',', format=None, encoding='utf-8', inspector=None):
from uliweb.utils.common import str_value
from StringIO import StringIO
import csv
if not std:
if isinstance(filename, (str, unicode)):
std = open(filename, 'w')
else:
std = filename
else:
std = sys.stdout
#add inspector table columns process, will not use model fields but database fields
if inspector:
meta = MetaData()
table = Table(table.name, meta)
inspector.reflecttable(table, None)
result = do_(table.select())
fields = [x.name for x in table.c]
if not format:
print >>std, '#' + ' '.join(fields)
elif format == 'txt':
print >>std, '#' + ','.join(fields)
for r in result:
if not format:
print >>std, r
elif format == 'txt':
buf = StringIO()
fw = csv.writer(buf, delimiter=delimiter)
fw.writerow([str_value(x, encoding=encoding) for x in r])
print >>std, buf.getvalue().rstrip()
else:
raise Exception, "Can't support the text format %s" % format
开发者ID:tangjn,项目名称:uliweb,代码行数:34,代码来源:commands.py
示例3: _create_shadow_tables
def _create_shadow_tables(migrate_engine):
meta = MetaData(migrate_engine)
meta.reflect(migrate_engine)
table_names = meta.tables.keys()
meta.bind = migrate_engine
for table_name in table_names:
table = Table(table_name, meta, autoload=True)
columns = []
for column in table.columns:
column_copy = None
# NOTE(boris-42): BigInteger is not supported by sqlite, so
# after copy it will have NullType, other
# types that are used in Nova are supported by
# sqlite.
if isinstance(column.type, NullType):
column_copy = Column(column.name, BigInteger(), default=0)
column_copy = column.copy()
columns.append(column_copy)
shadow_table_name = 'shadow_' + table_name
shadow_table = Table(shadow_table_name, meta, *columns,
mysql_engine='InnoDB')
try:
shadow_table.create(checkfirst=True)
except Exception:
LOG.info(repr(shadow_table))
LOG.exception(_('Exception while creating table.'))
raise
开发者ID:Hybrid-Cloud,项目名称:conveyor,代码行数:31,代码来源:215_meta_only.py
示例4: test_foreignkey_missing_insert
def test_foreignkey_missing_insert(self):
Table("t1", self.metadata, Column("id", Integer, primary_key=True))
t2 = Table(
"t2",
self.metadata,
Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
)
self.metadata.create_all()
# want to ensure that "null value in column "id" violates not-
# null constraint" is raised (IntegrityError on psycoopg2, but
# ProgrammingError on pg8000), and not "ProgrammingError:
# (ProgrammingError) relationship "t2_id_seq" does not exist".
# the latter corresponds to autoincrement behavior, which is not
# the case here due to the foreign key.
for eng in [
engines.testing_engine(options={"implicit_returning": False}),
engines.testing_engine(options={"implicit_returning": True}),
]:
with expect_warnings(
".*has no Python-side or server-side default.*"
):
assert_raises(
(exc.IntegrityError, exc.ProgrammingError),
eng.execute,
t2.insert(),
)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:28,代码来源:test_query.py
示例5: update_dat_crimes
def update_dat_crimes():
# Step Five: Update Main Crime table
dat_crime_table = Table('dat_chicago_crimes_all', Base.metadata,
autoload=True, autoload_with=engine, extend_existing=True)
src_crime_table = Table('src_chicago_crimes_all', Base.metadata,
autoload=True, autoload_with=engine, extend_existing=True)
try:
new_crime_table = Table('new_chicago_crimes_all', Base.metadata,
autoload=True, autoload_with=engine, extend_existing=True)
except NoSuchTableError:
return None
excluded_cols = ['end_date', 'current_flag', 'chicago_crimes_all_row_id']
dat_cols = [c for c in dat_crime_table.columns.keys() if c not in excluded_cols]
excluded_cols.append('start_date')
src_cols = [c for c in src_crime_table.columns if c.name not in excluded_cols]
src_cols.append(text("'%s' AS start_date" % datetime.now().strftime('%Y-%m-%d')))
ins = dat_crime_table.insert()\
.from_select(
dat_cols,
select(src_cols)\
.select_from(src_crime_table.join(new_crime_table,
src_crime_table.c.id == new_crime_table.c.id))
)
conn = engine.contextual_connect()
conn.execute(ins)
return 'Crime Table updated'
开发者ID:EmilyWebber,项目名称:plenario,代码行数:26,代码来源:crime_helpers.py
示例6: __init__
def __init__(self):
metadata = MetaData()
self.engine = create_engine('mysql://root:[email protected]:3006/games_online', encoding='utf-8',pool_recycle=7200,pool_size=15,max_overflow=30)
# self._dbSession = scoped_session(
# sessionmaker(
# bind=self.engine
# )
# )
self.games_info = Table('games_info', metadata,
Column('id', INTEGER, primary_key=True),
Column('gamecode', VARCHAR(20)),
Column('language', VARCHAR(20)),
Column('sid_api', VARCHAR(255)),
Column('online_api', VARCHAR(255)))
self.games_online = Table('games_online', metadata,
Column('id', INTEGER, primary_key=True),
Column('gamecode', VARCHAR(20)),
Column('language', VARCHAR(20)),
Column('region', VARCHAR(20)),
Column('serverid', INTEGER),
Column('online', INTEGER),
Column('time', INTEGER))
self.games_triggers = Table('games_triggers', metadata,
Column('id', INTEGER, primary_key=True),
Column('gamecode', VARCHAR(20)),
Column('language', VARCHAR(20)),
Column('region', VARCHAR(20)),
Column('serverid', INTEGER),
Column('time', INTEGER))
metadata.create_all(self.engine)
开发者ID:jiaoxingrong,项目名称:DevOps,代码行数:30,代码来源:line_util.py
示例7: _test_selfref_fk
def _test_selfref_fk(self, recreate):
bar = Table(
'bar', self.metadata,
Column('id', Integer, primary_key=True),
Column('bar_id', Integer, ForeignKey('bar.id')),
Column('data', String(50)),
mysql_engine='InnoDB'
)
bar.create(self.conn)
self.conn.execute(bar.insert(), {'id': 1, 'data': 'x', 'bar_id': None})
self.conn.execute(bar.insert(), {'id': 2, 'data': 'y', 'bar_id': 1})
with self.op.batch_alter_table("bar", recreate=recreate) as batch_op:
batch_op.alter_column(
'data', new_column_name='newdata', existing_type=String(50))
insp = Inspector.from_engine(self.conn)
insp = Inspector.from_engine(self.conn)
eq_(
[(key['referred_table'],
key['referred_columns'], key['constrained_columns'])
for key in insp.get_foreign_keys('bar')],
[('bar', ['id'], ['bar_id'])]
)
开发者ID:davidszotten,项目名称:alembic,代码行数:25,代码来源:test_batch.py
示例8: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
instance_table = Table('instance', meta,
Column('id', Integer, primary_key=True),
Column('key', Unicode(20), nullable=False, unique=True),
Column('label', Unicode(255), nullable=False),
Column('description', UnicodeText(), nullable=True),
Column('required_majority', Float, nullable=False),
Column('activation_delay', Integer, nullable=False),
Column('create_time', DateTime, default=func.now()),
Column('access_time', DateTime, default=func.now(), onupdate=func.now()),
Column('delete_time', DateTime, nullable=True),
Column('creator_id', Integer, ForeignKey('user.id'), nullable=False),
Column('default_group_id', Integer, ForeignKey('group.id'), nullable=True),
Column('allow_adopt', Boolean, default=True),
Column('allow_delegate', Boolean, default=True),
Column('allow_index', Boolean, default=True),
Column('hidden', Boolean, default=False),
Column('locale', Unicode(7), nullable=True),
Column('css', UnicodeText(), nullable=True),
Column('use_norms', Boolean, nullable=True, default=True)
)
propose = Column('allow_propose', Boolean, default=True)
propose.create(instance_table)
u = instance_table.update(values={'allow_propose': True})
migrate_engine.execute(u)
开发者ID:AnonOnWarpath,项目名称:adhocracy,代码行数:27,代码来源:025_instance_gets_norm_propose_option.py
示例9: SQLBackend
class SQLBackend(BaseBackend):
def __init__(self, url, table_name='gimlet_channels', **engine_kwargs):
meta = MetaData(bind=create_engine(url, **engine_kwargs))
self.table = Table(table_name, meta,
Column('id', types.Integer, primary_key=True),
Column('key', types.CHAR(32), nullable=False,
unique=True),
Column('data', types.LargeBinary, nullable=False))
self.table.create(checkfirst=True)
def __setitem__(self, key, value):
table = self.table
key_col = table.c.key
raw = self.serialize(value)
# Check if this key exists with a SELECT FOR UPDATE, to protect
# against a race with other concurrent writers of this key.
r = table.count(key_col == key, for_update=True).scalar()
if r:
# If it exists, use an UPDATE.
table.update().values(data=raw).where(key_col == key).execute()
else:
# Otherwise INSERT.
table.insert().values(key=key, data=raw).execute()
def __getitem__(self, key):
raw = select([self.table.c.data], self.table.c.key == key).scalar()
if raw:
return self.deserialize(raw)
else:
raise KeyError('key %r not found' % key)
开发者ID:wylee,项目名称:gimlet,代码行数:31,代码来源:sql.py
示例10: test_auto_append_constraint
def test_auto_append_constraint(self):
m = MetaData()
t = Table('tbl', m,
Column('a', Integer),
Column('b', Integer)
)
t2 = Table('t2', m,
Column('a', Integer),
Column('b', Integer)
)
for c in (
UniqueConstraint(t.c.a),
CheckConstraint(t.c.a > 5),
ForeignKeyConstraint([t.c.a], [t2.c.a]),
PrimaryKeyConstraint(t.c.a)
):
assert c in t.constraints
t.append_constraint(c)
assert c in t.constraints
c = Index('foo', t.c.a)
assert c in t.indexes
开发者ID:Daniel-B-Smith,项目名称:sqlalchemy,代码行数:25,代码来源:test_constraints.py
示例11: test_tometadata_ok
def test_tometadata_ok(self):
m = MetaData()
t = Table('tbl', m,
Column('a', Integer),
Column('b', Integer)
)
t2 = Table('t2', m,
Column('a', Integer),
Column('b', Integer)
)
UniqueConstraint(t.c.a)
CheckConstraint(t.c.a > 5)
ForeignKeyConstraint([t.c.a], [t2.c.a])
PrimaryKeyConstraint(t.c.a)
m2 = MetaData()
t3 = t.tometadata(m2)
eq_(len(t3.constraints), 4)
for c in t3.constraints:
assert c.table is t3
开发者ID:Daniel-B-Smith,项目名称:sqlalchemy,代码行数:26,代码来源:test_constraints.py
示例12: upgrade
def upgrade(migrate_engine):
"""Add workers table."""
meta = MetaData()
meta.bind = migrate_engine
workers = Table(
'workers', meta,
# Inherited fields from CinderBase
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(), default=False),
# Workers table specific fields
Column('id', Integer, primary_key=True),
Column('resource_type', String(40), nullable=False),
Column('resource_id', String(36), nullable=False),
Column('status', String(255), nullable=False),
Column('service_id', Integer, nullable=True),
UniqueConstraint('resource_type', 'resource_id'),
mysql_engine='InnoDB',
mysql_charset='utf8',
)
workers.create()
services = Table('services', meta, autoload=True)
ForeignKeyConstraint(
columns=[workers.c.service_id],
refcolumns=[services.c.id]).create()
开发者ID:C2python,项目名称:cinder,代码行数:32,代码来源:076_add_workers_table.py
示例13: test_checksfor_sequence
def test_checksfor_sequence(self):
meta1 = self.metadata
seq = Sequence("fooseq")
t = Table("mytable", meta1, Column("col1", Integer, seq))
seq.drop()
testing.db.execute("CREATE SEQUENCE fooseq")
t.create(checkfirst=True)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_dialect.py
示例14: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
virtual_interfaces = Table('virtual_interfaces', meta, autoload=True)
virtual_interfaces.drop_column('uuid')
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:7,代码来源:038_add_uuid_to_virtual_interfaces.py
示例15: test_reflection_with_unique_constraint
def test_reflection_with_unique_constraint(self):
insp = inspect(testing.db)
meta = self.metadata
uc_table = Table('pgsql_uc', meta,
Column('a', String(10)),
UniqueConstraint('a', name='uc_a'))
uc_table.create()
# PostgreSQL will create an implicit index for a unique
# constraint. Separately we get both
indexes = set(i['name'] for i in insp.get_indexes('pgsql_uc'))
constraints = set(i['name']
for i in insp.get_unique_constraints('pgsql_uc'))
self.assert_('uc_a' in indexes)
self.assert_('uc_a' in constraints)
# reflection corrects for the dupe
reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)
indexes = set(i.name for i in reflected.indexes)
constraints = set(uc.name for uc in reflected.constraints)
self.assert_('uc_a' not in indexes)
self.assert_('uc_a' in constraints)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py
示例16: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
volumes = Table('volumes', meta, autoload=True)
# New table
transfers = Table(
'transfers', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean),
Column('id', String(36), primary_key=True, nullable=False),
Column('volume_id', String(length=36), ForeignKey('volumes.id'),
nullable=False),
Column('display_name', String(length=255)),
Column('salt', String(length=255)),
Column('crypt_hash', String(length=255)),
Column('expires_at', DateTime(timezone=False)),
mysql_engine='InnoDB',
mysql_charset='utf8'
)
try:
transfers.create()
except Exception:
LOG.error(_("Table |%s| not created!"), repr(transfers))
raise
开发者ID:KumaHo,项目名称:cinder,代码行数:29,代码来源:010_add_transfers_table.py
示例17: test_reflect_unique_index
def test_reflect_unique_index(self):
insp = inspect(testing.db)
meta = self.metadata
# a unique index OTOH we are able to detect is an index
# and not a unique constraint
uc_table = Table('pgsql_uc', meta,
Column('a', String(10)),
Index('ix_a', 'a', unique=True))
uc_table.create()
indexes = dict((i['name'], i) for i in insp.get_indexes('pgsql_uc'))
constraints = set(i['name']
for i in insp.get_unique_constraints('pgsql_uc'))
self.assert_('ix_a' in indexes)
assert indexes['ix_a']['unique']
self.assert_('ix_a' not in constraints)
reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)
indexes = dict((i.name, i) for i in reflected.indexes)
constraints = set(uc.name for uc in reflected.constraints)
self.assert_('ix_a' in indexes)
assert indexes['ix_a'].unique
self.assert_('ix_a' not in constraints)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:29,代码来源:test_reflection.py
示例18: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
user_table = Table('user', meta, autoload=True)
instance_table = Table('instance', meta, autoload=True)
delegateable_table = Table('delegateable', meta, autoload=True)
milestone_table = Table('milestone', meta,
Column('id', Integer, primary_key=True),
Column('instance_id', Integer, ForeignKey('instance.id'), nullable=False),
Column('creator_id', Integer, ForeignKey('user.id'), nullable=False),
Column('title', Unicode(255), nullable=True),
Column('text', UnicodeText(), nullable=True),
Column('time', DateTime),
Column('create_time', DateTime, default=datetime.utcnow),
Column('delete_time', DateTime)
)
milestone_table.create()
ms_col = Column('milestone_id', Integer, ForeignKey('milestone.id'), nullable=True)
ms_col.create(delegateable_table)
ms_bool = Column('milestones', Boolean, default=False)
ms_bool.create(instance_table)
u = instance_table.update(values={'milestones': False})
migrate_engine.execute(u)
开发者ID:AnonOnWarpath,项目名称:adhocracy,代码行数:25,代码来源:028_milestones.py
示例19: upgrade
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# create new table
task_log = Table('task_log', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted',
Boolean(create_constraint=True, name=None)),
Column('id', Integer(),
primary_key=True,
nullable=False,
autoincrement=True),
Column('task_name', String(255), nullable=False),
Column('state', String(255), nullable=False),
Column('host', String(255), index=True, nullable=False),
Column('period_beginning', String(255),
index=True, nullable=False),
Column('period_ending', String(255), index=True, nullable=False),
Column('message', String(255), nullable=False),
Column('task_items', Integer()),
Column('errors', Integer()),
)
try:
task_log.create()
except Exception:
meta.drop_all(tables=[task_log])
raise
if migrate_engine.name == "mysql":
migrate_engine.execute("ALTER TABLE task_log "
"Engine=InnoDB")
开发者ID:SimiPro,项目名称:nova,代码行数:34,代码来源:108_task_log.py
示例20: _get_non_cell0_mappings
def _get_non_cell0_mappings():
"""Queries the API database for non-cell0 cell mappings."""
meta = MetaData(bind=db_session.get_api_engine())
cell_mappings = Table('cell_mappings', meta, autoload=True)
return cell_mappings.select().where(
cell_mappings.c.uuid !=
cell_mapping_obj.CellMapping.CELL0_UUID).execute().fetchall()
开发者ID:klmitch,项目名称:nova,代码行数:7,代码来源:status.py
注:本文中的sqlalchemy.Table类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论