• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.Table类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.Table的典型用法代码示例。如果您正苦于以下问题:Python Table类的具体用法?Python Table怎么用?Python Table使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Table类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: store_crop_calendar

def store_crop_calendar(engine, fname_HDFstore, args):
    """retrieves the CROP_CALENDAR table from a CGMS12 database.

    if the --crop_no option is used only the records for the given crop_no
    will be retrieved.
    """
    meta = MetaData(engine)
    tbl_cal = Table("crop_calendar", meta, autoload=True)

    # retrieve distinct crop types from DB table
    s = sa.select([tbl_cal.c.crop_no]).distinct()
    crops = [row[0] for row in s.execute()]

    # Check if only one crop type should be selected from DB
    if args.crop_no is not None:
        if args.crop_no not in crops:
            print("Crop ID specified with --cropno (%s) not found in CROP_CALENDAR table! Returning..." % args.crop_no)
            sys.exit()
        crops = [args.crop_no]

    # Start pulling crop_calendar data from DB
    dataset_name = "/crop_calendar"
    with pd.io.pytables.HDFStore(fname_HDFstore) as store:
        for crop in crops:
            print("Storing crop_calendar for crop %i" % crop)
            s = tbl_cal.select().where(tbl_cal.c.crop_no==crop)
            df_cal = pd.read_sql(s, engine)
            if dataset_name in store:
                store.append(dataset_name, df_cal, data_columns=["grid_no", "crop_no", "year"])
            else:
                store.put(dataset_name, df_cal, format="table", data_columns=["grid_no", "crop_no", "year"])
开发者ID:ritviksahajpal,项目名称:pcse,代码行数:31,代码来源:cgms2hdfstore.py


示例2: dump_table

def dump_table(table, filename, con, std=None, delimiter=',', format=None, encoding='utf-8', inspector=None):
    from uliweb.utils.common import str_value
    from StringIO import StringIO
    import csv
    
    if not std:
        if isinstance(filename, (str, unicode)):
            std = open(filename, 'w')
        else:
            std = filename
    else:
        std = sys.stdout
    #add inspector table columns process, will not use model fields but database fields
    if inspector:
        meta = MetaData()
        table = Table(table.name, meta)
        inspector.reflecttable(table, None)
        
    result = do_(table.select())
    fields = [x.name for x in table.c]
    if not format:
        print >>std, '#' + ' '.join(fields)
    elif format == 'txt':
        print >>std, '#' + ','.join(fields)
    for r in result:
        if not format:
            print >>std, r
        elif format == 'txt':
            buf = StringIO()
            fw = csv.writer(buf, delimiter=delimiter)
            fw.writerow([str_value(x, encoding=encoding) for x in r])
            print >>std, buf.getvalue().rstrip()
        else:
            raise Exception, "Can't support the text format %s" % format
开发者ID:tangjn,项目名称:uliweb,代码行数:34,代码来源:commands.py


示例3: _create_shadow_tables

def _create_shadow_tables(migrate_engine):
    meta = MetaData(migrate_engine)
    meta.reflect(migrate_engine)
    table_names = meta.tables.keys()

    meta.bind = migrate_engine

    for table_name in table_names:
        table = Table(table_name, meta, autoload=True)

        columns = []
        for column in table.columns:
            column_copy = None
            # NOTE(boris-42): BigInteger is not supported by sqlite, so
            #                 after copy it will have NullType, other
            #                 types that are used in Nova are supported by
            #                 sqlite.
            if isinstance(column.type, NullType):
                column_copy = Column(column.name, BigInteger(), default=0)
            column_copy = column.copy()
            columns.append(column_copy)

        shadow_table_name = 'shadow_' + table_name
        shadow_table = Table(shadow_table_name, meta, *columns,
                             mysql_engine='InnoDB')
        try:
            shadow_table.create(checkfirst=True)
        except Exception:
            LOG.info(repr(shadow_table))
            LOG.exception(_('Exception while creating table.'))
            raise
开发者ID:Hybrid-Cloud,项目名称:conveyor,代码行数:31,代码来源:215_meta_only.py


示例4: test_foreignkey_missing_insert

    def test_foreignkey_missing_insert(self):
        Table("t1", self.metadata, Column("id", Integer, primary_key=True))
        t2 = Table(
            "t2",
            self.metadata,
            Column("id", Integer, ForeignKey("t1.id"), primary_key=True),
        )
        self.metadata.create_all()

        # want to ensure that "null value in column "id" violates not-
        # null constraint" is raised (IntegrityError on psycoopg2, but
        # ProgrammingError on pg8000), and not "ProgrammingError:
        # (ProgrammingError) relationship "t2_id_seq" does not exist".
        # the latter corresponds to autoincrement behavior, which is not
        # the case here due to the foreign key.

        for eng in [
            engines.testing_engine(options={"implicit_returning": False}),
            engines.testing_engine(options={"implicit_returning": True}),
        ]:
            with expect_warnings(
                ".*has no Python-side or server-side default.*"
            ):
                assert_raises(
                    (exc.IntegrityError, exc.ProgrammingError),
                    eng.execute,
                    t2.insert(),
                )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:28,代码来源:test_query.py


示例5: update_dat_crimes

def update_dat_crimes():
    # Step Five: Update Main Crime table
    dat_crime_table = Table('dat_chicago_crimes_all', Base.metadata, 
        autoload=True, autoload_with=engine, extend_existing=True)
    src_crime_table = Table('src_chicago_crimes_all', Base.metadata, 
        autoload=True, autoload_with=engine, extend_existing=True)
    try:
        new_crime_table = Table('new_chicago_crimes_all', Base.metadata, 
            autoload=True, autoload_with=engine, extend_existing=True)
    except NoSuchTableError:
        return None
    excluded_cols = ['end_date', 'current_flag', 'chicago_crimes_all_row_id']
    dat_cols = [c for c in dat_crime_table.columns.keys() if c not in excluded_cols]
    excluded_cols.append('start_date')
    src_cols = [c for c in src_crime_table.columns if c.name not in excluded_cols]
    src_cols.append(text("'%s' AS start_date" % datetime.now().strftime('%Y-%m-%d')))
    ins = dat_crime_table.insert()\
        .from_select(
            dat_cols,
            select(src_cols)\
                .select_from(src_crime_table.join(new_crime_table,
                    src_crime_table.c.id == new_crime_table.c.id))
        )
    conn = engine.contextual_connect()
    conn.execute(ins)
    return 'Crime Table updated'
开发者ID:EmilyWebber,项目名称:plenario,代码行数:26,代码来源:crime_helpers.py


示例6: __init__

 def __init__(self):
     metadata = MetaData()
     self.engine = create_engine('mysql://root:[email protected]:3006/games_online', encoding='utf-8',pool_recycle=7200,pool_size=15,max_overflow=30)
     # self._dbSession = scoped_session(
     #    sessionmaker(
     #        bind=self.engine
     #    )
     # )
     self.games_info = Table('games_info', metadata,
                             Column('id', INTEGER, primary_key=True),
                             Column('gamecode', VARCHAR(20)),
                             Column('language', VARCHAR(20)),
                             Column('sid_api', VARCHAR(255)),
                             Column('online_api', VARCHAR(255)))
     self.games_online = Table('games_online', metadata,
                               Column('id', INTEGER, primary_key=True),
                               Column('gamecode', VARCHAR(20)),
                               Column('language', VARCHAR(20)),
                               Column('region', VARCHAR(20)),
                               Column('serverid', INTEGER),
                               Column('online', INTEGER),
                               Column('time', INTEGER))
     self.games_triggers = Table('games_triggers', metadata,
                                 Column('id', INTEGER, primary_key=True),
                                 Column('gamecode', VARCHAR(20)),
                                 Column('language', VARCHAR(20)),
                                 Column('region', VARCHAR(20)),
                                 Column('serverid', INTEGER),
                                 Column('time', INTEGER))
     metadata.create_all(self.engine)
开发者ID:jiaoxingrong,项目名称:DevOps,代码行数:30,代码来源:line_util.py


示例7: _test_selfref_fk

    def _test_selfref_fk(self, recreate):
        bar = Table(
            'bar', self.metadata,
            Column('id', Integer, primary_key=True),
            Column('bar_id', Integer, ForeignKey('bar.id')),
            Column('data', String(50)),
            mysql_engine='InnoDB'
        )
        bar.create(self.conn)
        self.conn.execute(bar.insert(), {'id': 1, 'data': 'x', 'bar_id': None})
        self.conn.execute(bar.insert(), {'id': 2, 'data': 'y', 'bar_id': 1})

        with self.op.batch_alter_table("bar", recreate=recreate) as batch_op:
            batch_op.alter_column(
                'data', new_column_name='newdata', existing_type=String(50))

        insp = Inspector.from_engine(self.conn)

        insp = Inspector.from_engine(self.conn)
        eq_(
            [(key['referred_table'],
             key['referred_columns'], key['constrained_columns'])
             for key in insp.get_foreign_keys('bar')],
            [('bar', ['id'], ['bar_id'])]
        )
开发者ID:davidszotten,项目名称:alembic,代码行数:25,代码来源:test_batch.py


示例8: upgrade

def upgrade(migrate_engine):
    meta.bind = migrate_engine
    instance_table = Table('instance', meta,
        Column('id', Integer, primary_key=True),
        Column('key', Unicode(20), nullable=False, unique=True),
        Column('label', Unicode(255), nullable=False),
        Column('description', UnicodeText(), nullable=True),
        Column('required_majority', Float, nullable=False),
        Column('activation_delay', Integer, nullable=False),
        Column('create_time', DateTime, default=func.now()),
        Column('access_time', DateTime, default=func.now(), onupdate=func.now()),
        Column('delete_time', DateTime, nullable=True),
        Column('creator_id', Integer, ForeignKey('user.id'), nullable=False),
        Column('default_group_id', Integer, ForeignKey('group.id'), nullable=True),
        Column('allow_adopt', Boolean, default=True),       
        Column('allow_delegate', Boolean, default=True),
        Column('allow_index', Boolean, default=True),
        Column('hidden', Boolean, default=False),
        Column('locale', Unicode(7), nullable=True),
        Column('css', UnicodeText(), nullable=True),
        Column('use_norms', Boolean, nullable=True, default=True)
    )
    
    propose = Column('allow_propose', Boolean, default=True)
    propose.create(instance_table)
    u = instance_table.update(values={'allow_propose': True})
    migrate_engine.execute(u)
开发者ID:AnonOnWarpath,项目名称:adhocracy,代码行数:27,代码来源:025_instance_gets_norm_propose_option.py


示例9: SQLBackend

class SQLBackend(BaseBackend):

    def __init__(self, url, table_name='gimlet_channels', **engine_kwargs):
        meta = MetaData(bind=create_engine(url, **engine_kwargs))
        self.table = Table(table_name, meta,
                           Column('id', types.Integer, primary_key=True),
                           Column('key', types.CHAR(32), nullable=False,
                                  unique=True),
                           Column('data', types.LargeBinary, nullable=False))
        self.table.create(checkfirst=True)

    def __setitem__(self, key, value):
        table = self.table
        key_col = table.c.key
        raw = self.serialize(value)
        # Check if this key exists with a SELECT FOR UPDATE, to protect
        # against a race with other concurrent writers of this key.
        r = table.count(key_col == key, for_update=True).scalar()
        if r:
            # If it exists, use an UPDATE.
            table.update().values(data=raw).where(key_col == key).execute()
        else:
            # Otherwise INSERT.
            table.insert().values(key=key, data=raw).execute()

    def __getitem__(self, key):
        raw = select([self.table.c.data], self.table.c.key == key).scalar()
        if raw:
            return self.deserialize(raw)
        else:
            raise KeyError('key %r not found' % key)
开发者ID:wylee,项目名称:gimlet,代码行数:31,代码来源:sql.py


示例10: test_auto_append_constraint

    def test_auto_append_constraint(self):
        m = MetaData()

        t = Table('tbl', m,
                  Column('a', Integer),
                  Column('b', Integer)
        )

        t2 = Table('t2', m,
                Column('a', Integer),
                Column('b', Integer)
        )

        for c in (
            UniqueConstraint(t.c.a),
            CheckConstraint(t.c.a > 5),
            ForeignKeyConstraint([t.c.a], [t2.c.a]),
            PrimaryKeyConstraint(t.c.a)
        ):
            assert c in t.constraints
            t.append_constraint(c)
            assert c in t.constraints

        c = Index('foo', t.c.a)
        assert c in t.indexes
开发者ID:Daniel-B-Smith,项目名称:sqlalchemy,代码行数:25,代码来源:test_constraints.py


示例11: test_tometadata_ok

    def test_tometadata_ok(self):
        m = MetaData()

        t = Table('tbl', m,
                  Column('a', Integer),
                  Column('b', Integer)
        )

        t2 = Table('t2', m,
                Column('a', Integer),
                Column('b', Integer)
        )

        UniqueConstraint(t.c.a)
        CheckConstraint(t.c.a > 5)
        ForeignKeyConstraint([t.c.a], [t2.c.a])
        PrimaryKeyConstraint(t.c.a)

        m2 = MetaData()

        t3 = t.tometadata(m2)

        eq_(len(t3.constraints), 4)

        for c in t3.constraints:
            assert c.table is t3
开发者ID:Daniel-B-Smith,项目名称:sqlalchemy,代码行数:26,代码来源:test_constraints.py


示例12: upgrade

def upgrade(migrate_engine):
    """Add workers table."""
    meta = MetaData()
    meta.bind = migrate_engine

    workers = Table(
        'workers', meta,
        # Inherited fields from CinderBase
        Column('created_at', DateTime(timezone=False)),
        Column('updated_at', DateTime(timezone=False)),
        Column('deleted_at', DateTime(timezone=False)),
        Column('deleted', Boolean(), default=False),

        # Workers table specific fields
        Column('id', Integer, primary_key=True),
        Column('resource_type', String(40), nullable=False),
        Column('resource_id', String(36), nullable=False),
        Column('status', String(255), nullable=False),
        Column('service_id', Integer, nullable=True),
        UniqueConstraint('resource_type', 'resource_id'),

        mysql_engine='InnoDB',
        mysql_charset='utf8',
    )

    workers.create()

    services = Table('services', meta, autoload=True)

    ForeignKeyConstraint(
        columns=[workers.c.service_id],
        refcolumns=[services.c.id]).create()
开发者ID:C2python,项目名称:cinder,代码行数:32,代码来源:076_add_workers_table.py


示例13: test_checksfor_sequence

 def test_checksfor_sequence(self):
     meta1 = self.metadata
     seq = Sequence("fooseq")
     t = Table("mytable", meta1, Column("col1", Integer, seq))
     seq.drop()
     testing.db.execute("CREATE SEQUENCE fooseq")
     t.create(checkfirst=True)
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:7,代码来源:test_dialect.py


示例14: downgrade

def downgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    virtual_interfaces = Table('virtual_interfaces', meta, autoload=True)

    virtual_interfaces.drop_column('uuid')
开发者ID:AnyBucket,项目名称:OpenStack-Install-and-Understand-Guide,代码行数:7,代码来源:038_add_uuid_to_virtual_interfaces.py


示例15: test_reflection_with_unique_constraint

    def test_reflection_with_unique_constraint(self):
        insp = inspect(testing.db)

        meta = self.metadata
        uc_table = Table('pgsql_uc', meta,
                         Column('a', String(10)),
                         UniqueConstraint('a', name='uc_a'))

        uc_table.create()

        # PostgreSQL will create an implicit index for a unique
        # constraint.   Separately we get both
        indexes = set(i['name'] for i in insp.get_indexes('pgsql_uc'))
        constraints = set(i['name']
                          for i in insp.get_unique_constraints('pgsql_uc'))

        self.assert_('uc_a' in indexes)
        self.assert_('uc_a' in constraints)

        # reflection corrects for the dupe
        reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)

        indexes = set(i.name for i in reflected.indexes)
        constraints = set(uc.name for uc in reflected.constraints)

        self.assert_('uc_a' not in indexes)
        self.assert_('uc_a' in constraints)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:27,代码来源:test_reflection.py


示例16: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    volumes = Table('volumes', meta, autoload=True)

    # New table
    transfers = Table(
        'transfers', meta,
        Column('created_at', DateTime(timezone=False)),
        Column('updated_at', DateTime(timezone=False)),
        Column('deleted_at', DateTime(timezone=False)),
        Column('deleted', Boolean),
        Column('id', String(36), primary_key=True, nullable=False),
        Column('volume_id', String(length=36), ForeignKey('volumes.id'),
               nullable=False),
        Column('display_name', String(length=255)),
        Column('salt', String(length=255)),
        Column('crypt_hash', String(length=255)),
        Column('expires_at', DateTime(timezone=False)),
        mysql_engine='InnoDB',
        mysql_charset='utf8'
    )

    try:
        transfers.create()
    except Exception:
        LOG.error(_("Table |%s| not created!"), repr(transfers))
        raise
开发者ID:KumaHo,项目名称:cinder,代码行数:29,代码来源:010_add_transfers_table.py


示例17: test_reflect_unique_index

    def test_reflect_unique_index(self):
        insp = inspect(testing.db)

        meta = self.metadata

        # a unique index OTOH we are able to detect is an index
        # and not a unique constraint
        uc_table = Table('pgsql_uc', meta,
                         Column('a', String(10)),
                         Index('ix_a', 'a', unique=True))

        uc_table.create()

        indexes = dict((i['name'], i) for i in insp.get_indexes('pgsql_uc'))
        constraints = set(i['name']
                          for i in insp.get_unique_constraints('pgsql_uc'))

        self.assert_('ix_a' in indexes)
        assert indexes['ix_a']['unique']
        self.assert_('ix_a' not in constraints)

        reflected = Table('pgsql_uc', MetaData(testing.db), autoload=True)

        indexes = dict((i.name, i) for i in reflected.indexes)
        constraints = set(uc.name for uc in reflected.constraints)

        self.assert_('ix_a' in indexes)
        assert indexes['ix_a'].unique
        self.assert_('ix_a' not in constraints)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:29,代码来源:test_reflection.py


示例18: upgrade

def upgrade(migrate_engine):
    meta.bind = migrate_engine

    user_table = Table('user', meta, autoload=True)
    instance_table = Table('instance', meta, autoload=True)
    delegateable_table = Table('delegateable', meta, autoload=True)

    milestone_table = Table('milestone', meta,
        Column('id', Integer, primary_key=True),
        Column('instance_id', Integer, ForeignKey('instance.id'), nullable=False),
        Column('creator_id', Integer, ForeignKey('user.id'), nullable=False),
        Column('title', Unicode(255), nullable=True),
        Column('text', UnicodeText(), nullable=True),
        Column('time', DateTime),
        Column('create_time', DateTime, default=datetime.utcnow),
        Column('delete_time', DateTime)
        )
    milestone_table.create()
    ms_col = Column('milestone_id', Integer, ForeignKey('milestone.id'), nullable=True)
    ms_col.create(delegateable_table)

    ms_bool = Column('milestones', Boolean, default=False)
    ms_bool.create(instance_table)
    u = instance_table.update(values={'milestones': False})
    migrate_engine.execute(u)
开发者ID:AnonOnWarpath,项目名称:adhocracy,代码行数:25,代码来源:028_milestones.py


示例19: upgrade

def upgrade(migrate_engine):
    meta = MetaData()
    meta.bind = migrate_engine

    # create new table
    task_log = Table('task_log', meta,
            Column('created_at', DateTime(timezone=False)),
            Column('updated_at', DateTime(timezone=False)),
            Column('deleted_at', DateTime(timezone=False)),
            Column('deleted',
                    Boolean(create_constraint=True, name=None)),
            Column('id', Integer(),
                    primary_key=True,
                    nullable=False,
                    autoincrement=True),
            Column('task_name', String(255), nullable=False),
            Column('state', String(255), nullable=False),
            Column('host', String(255), index=True, nullable=False),
            Column('period_beginning', String(255),
                                       index=True, nullable=False),
            Column('period_ending', String(255), index=True, nullable=False),
            Column('message', String(255), nullable=False),
            Column('task_items', Integer()),
            Column('errors', Integer()),
            )
    try:
        task_log.create()
    except Exception:
        meta.drop_all(tables=[task_log])
        raise

    if migrate_engine.name == "mysql":
        migrate_engine.execute("ALTER TABLE task_log "
                "Engine=InnoDB")
开发者ID:SimiPro,项目名称:nova,代码行数:34,代码来源:108_task_log.py


示例20: _get_non_cell0_mappings

 def _get_non_cell0_mappings():
     """Queries the API database for non-cell0 cell mappings."""
     meta = MetaData(bind=db_session.get_api_engine())
     cell_mappings = Table('cell_mappings', meta, autoload=True)
     return cell_mappings.select().where(
         cell_mappings.c.uuid !=
             cell_mapping_obj.CellMapping.CELL0_UUID).execute().fetchall()
开发者ID:klmitch,项目名称:nova,代码行数:7,代码来源:status.py



注:本文中的sqlalchemy.Table类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python mssql.dialect函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.MetaData类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap