本文整理汇总了Python中sqlalchemy.schema.Column类的典型用法代码示例。如果您正苦于以下问题:Python Column类的具体用法?Python Column怎么用?Python Column使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Column类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
records_table = Table('records', meta, autoload=True)
# Add the hash column, start with allowing NULLs
hash_column = Column('hash', String(32), nullable=True, default=None,
unique=True)
hash_column.create(records_table, unique_name='unique_record')
sync_domains = []
# Fill out the hash values. We need to do this in a way that lets us track
# which domains need to be re-synced, so having the DB do this directly
# won't work.
for record in records_table.select().execute():
try:
records_table.update()\
.where(records_table.c.id == record.id)\
.values(hash=_build_hash(record))\
.execute()
except IntegrityError:
if record.domain_id not in sync_domains:
sync_domains.append(record.domain_id)
LOG.warn(_LW("Domain '%s' needs to be synchronised") %
record.domain_id)
records_table.delete()\
.where(records_table.c.id == record.id)\
.execute()
# Finally, the column should not be nullable.
records_table.c.hash.alter(nullable=False)
开发者ID:dhellmann,项目名称:designate,代码行数:33,代码来源:015_add_unique_record_constraint.py
示例2: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
domains_table = Table('domains', meta, autoload=True)
# Get the default pool_id from the config file
default_pool_id = cfg.CONF['service:central'].default_pool_id
# Create the pool_id column
pool_id_column = Column('pool_id',
UUID(),
default=default_pool_id,
nullable=True)
pool_id_column.create(domains_table, populate_default=True)
# Alter the table to drop default value after populating it
domains_table.c.pool_id.alter(default=None)
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith('sqlite'):
# Add missing unique index
constraint = UniqueConstraint('name', 'deleted',
name='unique_domain_name',
table=domains_table)
constraint.create()
开发者ID:Vegasq,项目名称:designate,代码行数:25,代码来源:044_add_pool_id_to_domains.py
示例3: downgrade
def downgrade(migrate_engine):
meta.bind = migrate_engine
records_table = Table('records', meta, autoload=True)
hash_column = Column('hash', String(32), nullable=False, unique=True)
hash_column.drop(records_table)
开发者ID:dhellmann,项目名称:designate,代码行数:7,代码来源:015_add_unique_record_constraint.py
示例4: downgrade
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
tasks_table = Table('tasks', meta, autoload=True)
task_info_table = Table('task_info', meta, autoload=True)
for col_name in TASKS_MIGRATE_COLUMNS:
column = Column(col_name, Text())
column.create(tasks_table)
task_info_records = task_info_table.select().execute().fetchall()
for task_info in task_info_records:
values = {
'input': task_info.input,
'result': task_info.result,
'message': task_info.message
}
tasks_table\
.update(values=values)\
.where(tasks_table.c.id == task_info.task_id)\
.execute()
drop_tables([task_info_table])
开发者ID:Web5design,项目名称:glance,代码行数:26,代码来源:032_add_task_info_table.py
示例5: upgrade
def upgrade(migrate_engine):
LOG.info(_LI("Adding boolean column delayed_notify to table 'zones'"))
meta.bind = migrate_engine
zones_table = Table('zones', meta, autoload=True)
col = Column('delayed_notify', Boolean(), default=False)
col.create(zones_table)
index = Index('delayed_notify', zones_table.c.delayed_notify)
index.create(migrate_engine)
开发者ID:ISCAS-VDI,项目名称:designate-base,代码行数:8,代码来源:084_add_delayed_notify_column.py
示例6: _ensure_columns
def _ensure_columns(self, row):
columns = set(row.keys()) - set(self.table.columns.keys())
columns = map(validate_columnname, columns)
for column in columns:
_type = self._guess_type(column, row[column])
log.debug("Creating column: %s (%s) on %r" % (column,
_type, self.table.name))
col = Column(column, _type)
col.create(self.table, connection=self.bind)
开发者ID:Spencerx,项目名称:webstore,代码行数:9,代码来源:database.py
示例7: create_column
def create_column(self, name, type):
"""
Explicitely create a new column ``name`` of a specified type.
``type`` must be a `SQLAlchemy column type <http://docs.sqlalchemy.org/en/rel_0_8/core/types.html>`_.
::
table.create_column('created_at', sqlalchemy.DateTime)
"""
self._check_dropped()
with self.database.lock:
if name not in self.table.columns.keys():
col = Column(name, type)
col.create(self.table,
connection=self.database.engine)
开发者ID:aklaver,项目名称:dataset,代码行数:14,代码来源:table.py
示例8: __init__
def __init__(self, *args, **kwargs):
""" Responsible for:
* Filter out type-specific kwargs and init Type using these.
* Filter out `_schema_class` kwargs and init `_schema_class`.
* Filter out column-slecific kwargs and init column using them.
* If `args` are provided, that means column proxy is being created.
In this case Type does not need to be created.
"""
if not hasattr(self, '_kwargs_backup'):
self._kwargs_backup = kwargs.copy()
type_args, type_kw, cleaned_kw = self.process_type_args(kwargs)
if not args:
schema_item, cleaned_kw = self._generate_schema_item(cleaned_kw)
column_kw = self.process_column_args(cleaned_kw)
# Column proxy is created by declarative extension
if args:
column_kw['name'], column_kw['type_'], schema_item = args
# Column init when defining a schema
else:
column_kw['type_'] = self._sqla_type_cls(*type_args, **type_kw)
if 'type_' not in kwargs:
self._init_kwargs = self._kwargs_backup.copy()
column_args = (schema_item,)
return Column.__init__(self, *column_args, **column_kw)
开发者ID:geniusproject,项目名称:nefertari-sqla,代码行数:25,代码来源:fields.py
示例9: upgrade
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
service_images = Table('service_images', meta, autoload=True)
tenantc = Column('tenant_id', String(255))
tenantc.create(service_images)
azc = Column('availability_zone', String(255))
azc.create(service_images)
开发者ID:hpcloud,项目名称:reddwarf_lite,代码行数:7,代码来源:006_Add_TenantID_AZ_ServiceImages.py
示例10: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
# Load the TSIG Keys tables
tsigkeys_table = Table('tsigkeys', meta, autoload=True)
scopes = Enum(name='tsig_scopes', metadata=meta, *TSIG_SCOPES)
scopes.create()
# Create the scope and resource columns
scope_col = Column('scope', scopes, nullable=False, server_default='POOL')
scope_col.create(tsigkeys_table)
# Start with nullable=True and populate_default=True, then convert
# to nullable=False once all rows have been populted with a resource_id
resource_id_col = Column('resource_id', UUID, default=default_pool_id,
nullable=True)
resource_id_col.create(tsigkeys_table, populate_default=True)
# Now that we've populated the default pool id in existing rows, MySQL
# will let us convert this over to nullable=False
tsigkeys_table.c.resource_id.alter(nullable=False)
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith('sqlite'):
# Add missing unique index
constraint = UniqueConstraint('name', name='unique_tsigkey_name',
table=tsigkeys_table)
constraint.create()
开发者ID:jkhelil,项目名称:designate,代码行数:29,代码来源:051_scoped_tsig.py
示例11: upgrade
def upgrade(migrate_engine):
meta = MetaData(bind=migrate_engine)
service_flavors = Table('service_flavors', meta, autoload=True)
conn = migrate_engine.connect()
trans = conn.begin()
try:
delete = service_flavors.delete()\
.where(service_flavors.c.service_name=='database')
conn.execute(delete)
trans.commit()
except:
trans.rollback()
raise
ramc = Column('ram', Integer())
ramc.create(service_flavors)
vcpusc = Column('vcpus', Integer())
vcpusc.create(service_flavors)
conn = migrate_engine.connect()
trans = conn.begin()
try:
for flavor in SERVICE_FLAVORS:
insert = service_flavors.insert()\
.execute(id=flavor['id'], service_name="database", flavor_name=flavor['flavor_name'],
flavor_id=flavor['flavor_id'], deleted=0, ram=flavor['ram'],
vcpus=flavor['vcpus'], created_at=datetime.datetime.now(), updated_at=datetime.datetime.now())
trans.commit
except:
trans.rollback()
raise
开发者ID:hpcloud,项目名称:reddwarf_lite,代码行数:32,代码来源:012_service_flavors.py
示例12: downgrade
def downgrade(migrate_engine):
meta.bind = migrate_engine
rs_table = Table('recordsets', meta, autoload=True)
records_table = Table('records', meta, autoload=True)
recordsets = _get_recordsets(rs_table)
col = Column('priority', Integer, default=None, nullable=True)
col.create(records_table)
record_cols = [
records_table.c.id,
records_table.c.priority,
records_table.c.data]
for rs in recordsets:
records = select(columns=record_cols)\
.where(records_table.c.recordset_id == rs[0])\
.execute().fetchall()
for record in records:
priority, _, data = record[2].partition(" ")
# Old style hashes are <rs_id>:<data>:<priority>
new_hash = _build_hash(rs[0], data, priority)
update = records_table.update()\
.where(records_table.c.id == record[0])\
.values(priority=int(priority), data=data, hash=new_hash)
update.execute()
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith('sqlite'):
# Add missing unique index
constraint = UniqueConstraint('hash',
name='unique_recordset',
table=records_table)
constraint.create()
开发者ID:AnatolyZimin,项目名称:designate,代码行数:39,代码来源:042_priority_to_data.py
示例13: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
dialect = migrate_engine.url.get_dialect().name
zone_tasks_table = Table('zone_tasks', meta, autoload=True)
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith("postgresql"):
with migrate_engine.connect() as conn:
conn.execution_options(isolation_level="AUTOCOMMIT")
conn.execute(
"ALTER TYPE task_types ADD VALUE 'EXPORT' "
"AFTER 'IMPORT'")
conn.close()
zone_tasks_table.c.task_type.alter(type=Enum(name='task_type',
*TASK_TYPES))
location = Column('location', String(160), nullable=True)
location.create(zone_tasks_table)
开发者ID:TimSimmons,项目名称:designate,代码行数:22,代码来源:069_zone_tasks_location.py
示例14: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
keys = Enum(name='key', *ZONE_ATTRIBUTE_KEYS)
domain_attributes_table = Table(
'domain_attributes', meta,
Column('id', UUID(), default=utils.generate_uuid, primary_key=True),
Column('version', Integer(), default=1, nullable=False),
Column('created_at', DateTime, default=lambda: timeutils.utcnow()),
Column('updated_at', DateTime, onupdate=lambda: timeutils.utcnow()),
Column('key', keys),
Column('value', String(255), nullable=False),
Column('domain_id', UUID(), nullable=False),
UniqueConstraint('key', 'value', 'domain_id',
name='unique_attributes'),
ForeignKeyConstraint(['domain_id'], ['domains.id'],
ondelete='CASCADE'),
mysql_engine='INNODB',
mysql_charset='utf8'
)
domains_table = Table('domains', meta, autoload=True)
types = Enum(name='types', metadata=meta, *ZONE_TYPES)
types.create()
# Add type and transferred_at to domains
type_ = Column('type', types, default='PRIMARY', server_default='PRIMARY')
transferred_at = Column('transferred_at', DateTime, default=None)
type_.create(domains_table, populate_default=True)
transferred_at.create(domains_table, populate_default=True)
domain_attributes_table.create()
dialect = migrate_engine.url.get_dialect().name
if dialect.startswith('sqlite'):
constraint = UniqueConstraint(
'name', 'deleted', name='unique_domain_name', table=domains_table)
# Add missing unique index
constraint.create()
开发者ID:jkhelil,项目名称:designate,代码行数:45,代码来源:052_secondary_zones.py
示例15: upgrade
def upgrade(migrate_engine):
meta.bind = migrate_engine
records_table = Table('records', meta, autoload=True)
# We need to autoload the domains table for the FK to succeed.
Table('domains', meta, autoload=True)
# Prepare an empty dict to cache (domain_id, name, type) tuples to
# RRSet id's
cache = {}
# Create the recordsets_table table
recordsets_table.create()
# NOTE(kiall): Since we need a unique UUID for each recordset, and need
# to maintain cross DB compatibility, we're stuck doing this
# in code rather than an
# INSERT INTO recordsets_table SELECT (..) FROM records;
results = select(
columns=[
records_table.c.tenant_id,
records_table.c.domain_id,
records_table.c.name,
records_table.c.type,
func.min(records_table.c.ttl).label('ttl'),
func.min(records_table.c.created_at).label('created_at'),
func.max(records_table.c.updated_at).label('updated_at')
],
group_by=[
records_table.c.tenant_id,
records_table.c.domain_id,
records_table.c.name,
records_table.c.type
]
).execute()
for result in results:
# Create the new RecordSet and remember it's id
pk = recordsets_table.insert().execute(
tenant_id=result.tenant_id,
domain_id=result.domain_id,
name=result.name,
type=result.type,
ttl=result.ttl,
created_at=result.created_at,
updated_at=result.updated_at
).inserted_primary_key[0]
# Cache the ID for later
cache_key = "%s.%s.%s" % (result.domain_id, result.name, result.type)
cache[cache_key] = pk
# Add the recordset column to the records table
record_recordset_id = Column('recordset_id', UUID,
default=None,
nullable=True)
record_recordset_id.create(records_table, populate_default=True)
# Fetch all the records
# TODO(kiall): Batch this..
results = select(
columns=[
records_table.c.id,
records_table.c.domain_id,
records_table.c.name,
records_table.c.type,
records_table.c.data,
records_table.c.priority
]
).execute()
# Update each result with the approperiate recordset_id, and refresh
# the hash column to reflect the removal of several fields.
for result in results:
cache_key = "%s.%s.%s" % (result.domain_id, result.name,
result.type)
recordset_id = cache[cache_key]
new_hash = _build_hash(recordset_id, result)
records_table.update()\
.where(records_table.c.id == result.id)\
.values(recordset_id=cache[cache_key], hash=new_hash)\
.execute()
# Now that the records.recordset_id field is populated, lets ensure the
# column is not nullable and is a FK to the records table.
records_table.c.recordset_id.alter(nullable=False)
ForeignKeyConstraint(columns=[records_table.c.recordset_id],
refcolumns=[recordsets_table.c.id],
ondelete='CASCADE',
name='fkey_records_recordset_id').create()
# Finally, drop the now-defunct columns from the records table
records_table.c.name.drop()
records_table.c.type.drop()
records_table.c.ttl.drop()
开发者ID:akshatknsl,项目名称:designate,代码行数:97,代码来源:034_add_recordsets_table.py
示例16: __init__
def __init__(self):
Column.__init__(self, 'foo', Integer)
开发者ID:ContextLogic,项目名称:sqlalchemy,代码行数:2,代码来源:test_selectable.py
示例17: create_column
def create_column(engine, table, name, type):
with lock:
if name not in table.columns.keys():
col = Column(name, type)
col.create(table, connection=engine)
开发者ID:rossjones,项目名称:sqlaload,代码行数:5,代码来源:schema.py
注:本文中的sqlalchemy.schema.Column类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论