• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.table函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.table函数的典型用法代码示例。如果您正苦于以下问题:Python table函数的具体用法?Python table怎么用?Python table使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了table函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: upgrade

def upgrade():
    conn = op.get_bind()

    cocols = [
        'created', 'updated', 'active', 'version', 'polymorphic_type',
        'id', 'name',
    ]
    co = sa.table('contribution', *map(sa.column, ['pk'] + cocols))
    chcols = ['pk', 'sortkey']
    ch = sa.table('chapter', *map(sa.column, chcols))

    id_, name = map(sa.bindparam, ['id_', 'name'])

    cowhere = [co.c.id == id_, co.c.name == name]

    insert_co = co.insert(bind=conn).from_select(cocols,
        sa.select([sa.func.now(), sa.func.now(), True, 1, sa.literal('custom'), id_, name])
        .where(~sa.exists().where(sa.or_(*cowhere))))

    co_pk = sa.select([co.c.pk]).where(sa.and_(*cowhere)).as_scalar()
    sortkey = sa.bindparam('sortkey')

    insert_ch = ch.insert(bind=conn).from_select(chcols,
        sa.select([co_pk, sortkey])
        .where(~sa.exists().where(ch.c.pk == co.c.pk).where(sa.or_(*cowhere)))
        .where(sa.exists().where(sa.and_(*cowhere))))

    insert_co.execute(id_=ID, name=NAME)
    insert_ch.execute(id_=ID, name=NAME, sortkey=SORTKEY)
开发者ID:clld,项目名称:wals3,代码行数:29,代码来源:39c1fbfc53ce_add_chapter_s2.py


示例2: upgrade

def upgrade():
    conn = op.get_bind()

    l = sa.table('language', *map(sa.column, ['pk', 'id', 'name']))
    ccols = ['created', 'updated', 'active', 'id', 'name', 'continent']
    c = sa.table('country', *map(sa.column, ['pk'] + ccols))
    lccols = ['created', 'updated', 'active', 'language_pk', 'country_pk']
    lc = sa.table('countrylanguage', *map(sa.column, lccols))

    lwhere = (l.c.id == sa.bindparam('id_'))

    cid, cname, ccont = map(sa.bindparam, ['cc', 'name', 'continent'])
    cwhere = (c.c.id == cid)

    insert_c = c.insert(bind=conn).from_select(ccols,
        sa.select([sa.func.now(), sa.func.now(), True, cid, cname, ccont])
        .where(~sa.exists().where(cwhere)))

    liwhere = sa.exists()\
        .where(lc.c.language_pk == l.c.pk).where(lwhere)\
        .where(lc.c.country_pk == c.c.pk).where(cwhere)

    unlink_country = lc.delete(bind=conn).where(liwhere)

    l_pk = sa.select([l.c.pk]).where(lwhere).as_scalar()
    c_pk = sa.select([c.c.pk]).where(cwhere).as_scalar()

    link_country = lc.insert(bind=conn).from_select(lccols,
        sa.select([sa.func.now(), sa.func.now(), True, l_pk, c_pk])
        .where(~liwhere))

    insert_c.execute(cc=AFTER, name=NAME, continent=CONTINENT)
    for id_ in IDS:
        unlink_country.execute(id_=id_, cc=BEFORE)
        link_country.execute(id_=id_, cc=AFTER)
开发者ID:clld,项目名称:wals3,代码行数:35,代码来源:15da24f7a5af_change_country_from_sd_to_ss.py


示例3: upgrade

def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    subsc = op.create_table(
        'subscription',
        sa.Column('id', sa.Integer(), nullable=False),
        sa.Column('user_id', sa.Integer(), sa.ForeignKey('user.id'), nullable=False),
        sa.Column('feed_id', sa.Integer(), sa.ForeignKey('feed.id'), nullable=False),
        sa.Column('name', sa.String(length=256), nullable=True),
        sa.Column('tags', sa.String(length=256), nullable=True),
        sa.PrimaryKeyConstraint('id')
    )

    feed = sa.table(
        'feed',
        sa.column('id', sa.Integer()),
        sa.column('name', sa.String()))

    u2f = sa.table(
        'users_to_feeds',
        sa.column('user_id', sa.Integer()),
        sa.column('feed_id', sa.Integer()))

    values = sa.select(
        [u2f.c.user_id, u2f.c.feed_id, feed.c.name]
    ).select_from(
        u2f.join(feed, feed.c.id == u2f.c.feed_id)
    )

    op.execute(subsc.insert().from_select(
        ['user_id', 'feed_id', 'name'], values))

    op.drop_table('users_to_feeds')
开发者ID:Lancey6,项目名称:woodwind,代码行数:32,代码来源:564f5a5061f_.py


示例4: execute

    def execute(self, connection, filter_values):
        max_date_query = sqlalchemy.select([
            sqlalchemy.func.max(sqlalchemy.column('completed_on')).label('completed_on'),
            sqlalchemy.column('case_id').label('case_id')
        ]).select_from(sqlalchemy.table(self.table_name))

        if self.filters:
            for filter in self.filters:
                max_date_query.append_whereclause(filter.build_expression())

        max_date_query.append_group_by(
            sqlalchemy.column('case_id')
        )

        max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')

        asha_table = self.get_asha_table_name()
        checklist_query = sqlalchemy.select()
        for column in self.columns:
            checklist_query.append_column(column.build_column())

        checklist_query = checklist_query.where(
            sqlalchemy.literal_column('"{}".case_id'.format(asha_table)) == max_date_subquery.c.case_id
        ).where(
            sqlalchemy.literal_column('"{}".completed_on'.format(asha_table)) == max_date_subquery.c.completed_on
        ).select_from(sqlalchemy.table(asha_table))

        return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:28,代码来源:sql_data.py


示例5: test_unconsumed_names_values_dict

    def test_unconsumed_names_values_dict(self):
        t = table("t", column("x"), column("y"))
        t2 = table("t2", column("q"), column("z"))

        assert_raises_message(
            exc.CompileError,
            "Unconsumed column names: j",
            t.update().values(x=5, j=7).values({t2.c.z: 5}).
            where(t.c.x == t2.c.q).compile,
        )
开发者ID:eoghanmurray,项目名称:sqlalchemy,代码行数:10,代码来源:test_update.py


示例6: test_compile_with_one_unnamed_table

def test_compile_with_one_unnamed_table():
    t = ibis.table([('a', 'string')])
    s = ibis.table([('b', 'string')], name='s')
    join = t.join(s, t.a == s.b)
    result = ibis.sqlite.compile(join)
    sqla_t = sa.table('t0', sa.column('a', sa.String)).alias('t0')
    sqla_s = sa.table('s', sa.column('b', sa.String)).alias('t1')
    sqla_join = sqla_t.join(sqla_s, sqla_t.c.a == sqla_s.c.b)
    expected = sa.select([sqla_t.c.a, sqla_s.c.b]).select_from(sqla_join)
    assert str(result) == str(expected)
开发者ID:deepfield,项目名称:ibis,代码行数:10,代码来源:test_functions.py


示例7: upgrade

def upgrade():
    conn = op.get_bind()

    language = sa.table('language', *map(sa.column, ['pk', 'id', 'name', 'updated']))
    lid = sa.bindparam('id_')
    lbefore = sa.bindparam('before')
    update_lang = sa.update(language, bind=conn)\
        .where(sa.and_(
            language.c.id == lid,
            language.c.name == lbefore))\
        .values(updated=sa.func.now(), name=sa.bindparam('after'))

    walslanguage = sa.table('walslanguage', *map(sa.column, ['pk', 'ascii_name']))
    aname = sa.bindparam('ascii_name')
    update_wals = sa.update(walslanguage, bind=conn)\
        .where(sa.exists().where(sa.and_(
            language.c.pk == walslanguage.c.pk,
            language.c.id == lid))\
        .where(walslanguage.c.ascii_name != aname))\
        .values(ascii_name=aname)

    icols = ['created', 'updated', 'active', 'version', 'type', 'description', 'lang', 'name']
    identifier = sa.table('identifier', *map(sa.column, ['pk'] + icols))
    itype, idesc, ilang = (sa.bindparam(*a) for a in [('type', 'name'), ('description', 'other'), ('lang', 'en')])
    iname = sa.bindparam('name')
    iwhere = sa.and_(
        identifier.c.type == itype,
        identifier.c.description == idesc,
        identifier.c.lang == ilang,
        identifier.c.name == iname)
    insert_ident = sa.insert(identifier, bind=conn).from_select(icols,
        sa.select([sa.func.now(), sa.func.now(), True, 1, itype, idesc, ilang, iname])
        .where(~sa.exists().where(iwhere)))

    licols = ['created', 'updated', 'active', 'version', 'language_pk', 'identifier_pk']
    languageidentifier = sa.table('languageidentifier', *map(sa.column, licols))
    l_pk = sa.select([language.c.pk]).where(language.c.id == lid)
    i_pk = sa.select([identifier.c.pk]).where(sa.and_(iwhere))
    insert_lang_ident = sa.insert(languageidentifier, bind=conn).from_select(licols,
        sa.select([sa.func.now(), sa.func.now(), True, 1, l_pk.as_scalar(), i_pk.as_scalar()])
        .where(~sa.exists().where(sa.and_(
            languageidentifier.c.language_pk == l_pk,
            languageidentifier.c.identifier_pk == i_pk))))

    for id_, (before, after, keep) in sorted(ID_BEFORE_AFTER_KEEP.items()):
        update_lang.execute(id_=id_, before=before, after=after)
        update_wals.execute(id_=id_, ascii_name=ascii_name(after))
        if keep:
            insert_ident.execute(name=before)
            insert_lang_ident.execute(id_=id_, name=before)
开发者ID:clld,项目名称:wals3,代码行数:50,代码来源:eb2efcd10cf_rename_languages.py


示例8: test_compare_tables

    def test_compare_tables(self):
        is_true(table_a.compare(table_a_2))

        # the "proxy" version compares schema tables on metadata identity
        is_false(table_a.compare(table_a_2, use_proxies=True))

        # same for lower case tables since it compares lower case columns
        # using proxies, which makes it very unlikely to have multiple
        # table() objects with columns that compare equally
        is_false(
            table("a", column("x", Integer), column("q", String)).compare(
                table("a", column("x", Integer), column("q", String)),
                use_proxies=True,
            )
        )
开发者ID:monetate,项目名称:sqlalchemy,代码行数:15,代码来源:test_compare.py


示例9: upgrade

def upgrade():
    from sqlalchemy.sql import text
    enum_values_table = sa.table('enum_values',
                                 sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
                                 sa.Column('type_id', sa.Integer(), nullable=True),
                                 sa.Column('code', sa.String(length=32), nullable=True),
                                 sa.Column('display', sa.String(length=64), nullable=False),
                                 )
    res = op.get_bind().execute('SELECT max(id)+1 FROM enum_values')
    results = res.fetchall()
    cm = 14
    for r in results:
        cm = r[0]
    op.bulk_insert(enum_values_table, [
        {'id': cm, 'type_id': 1, 'code': 'INVENTORY_TRANSACTION_TYPE', 'display': u'库存变动类型'},
        {'id': cm + 1, 'type_id': 1, 'code': 'RECEIVING_STATUS', 'display': u'收货单状态'},
        {'id': cm + 2, 'type_id': 1, 'code': 'PURCHASE_ORDER_STATUS', 'display': u'采购单状态'},
        {'id': cm + 3, 'type_id': 1, 'code': 'SHIPPING_STATUS', 'display': u'发货单状态'},
        {'id': cm + 4, 'type_id': cm, 'code': 'PURCHASE_IN', 'display': u'采购入库'},
        {'id': cm + 5, 'type_id': cm, 'code': 'SALES_OUT', 'display': u'销售出库'},
        {'id': cm + 6, 'type_id': cm, 'code': 'INVENTORY_DAMAGED', 'display': u'商品损毁'},
        {'id': cm + 7, 'type_id': cm, 'code': 'INVENTORY_LOST', 'display': u'商品丢失'},
        {'id': cm + 8, 'type_id': cm + 1, 'code': 'RECEIVING_DRAFT', 'display': u'收货单草稿'},
        {'id': cm + 9, 'type_id': cm + 1, 'code': 'RECEIVING_COMPLETE', 'display': u'收货单完成'},
        {'id': cm + 10, 'type_id': cm + 2, 'code': 'PURCHASE_ORDER_DRAFT', 'display': u'草稿'},
        {'id': cm + 11, 'type_id': cm + 2, 'code': 'PURCHASE_ORDER_ISSUED', 'display': u'已发出'},
        {'id': cm + 12, 'type_id': cm + 2, 'code': 'PURCHASE_ORDER_PART_RECEIVED', 'display': u'部分收货'},
        {'id': cm + 13, 'type_id': cm + 2, 'code': 'PURCHASE_ORDER_RECEIVED', 'display': u'收货完成'},
        {'id': cm + 14, 'type_id': cm + 3, 'code': 'SHIPPING_COMPLETE', 'display': u'发货完成'},
    ], multiinsert=False)
    op.get_bind().execute(text("ALTER SEQUENCE enum_values_id_seq RESTART WITH " + str(cm + 15) + ";"))
开发者ID:betterlife,项目名称:psi,代码行数:31,代码来源:12_f1603288979d_.py


示例10: __init__

    def __init__(self, name, query,
                 column_names=None,
                 temporary=False,
                 view_options=None,
                 check_option=None):
        """DDL Element representing a VIEW

        :param name: The name of the view
        :param query: the query it represents
        :param column_names:
        :param temporary:
        :param view_options: Must be something that can be passed to
            OrderedDict, so a simple dict suffices.
        :param check_option: Must be one of ``None``, ``'local'``,
            ``'cascaded'``.
        """
        self.name = name
        self.query = query
        self.table = table(name)
        self.temporary = temporary
        self.column_names = column_names
        self._init_table_columns()
        if view_options is None:
            view_options = OrderedDict()
        else:
            view_options = OrderedDict(view_options)
        self.view_options = view_options
        if check_option not in (None, 'local', 'cascaded'):
            raise ValueError("check_option must be either None, 'local', or "
                             "'cascaded'")
        if check_option is not None and 'check_option' in view_options:
            raise ValueError('Parameter "check_option" specified more than '
                             'once')
        self.check_option = check_option
开发者ID:agdsn,项目名称:pycroft,代码行数:34,代码来源:ddl.py


示例11: upgrade

def upgrade():
    # step 1, add new uuid column
    op.add_column("comparison", sa.Column("uuid", sa.CHAR(22), nullable=False, server_default=""))

    connection = op.get_bind()
    # step 2, fill in unique uuids (base64 url safe strings)
    comparison_table = sa.table("comparison", sa.Column("id", sa.Integer()), sa.Column("uuid", sa.CHAR(22)))
    for record in connection.execute(comparison_table.select()):
        connection.execute(
            comparison_table.update()
            .where(comparison_table.c.id == record.id)
            .values(uuid=str(base64.urlsafe_b64encode(uuid.uuid4().bytes)).replace("=", ""))
        )

    # step 3, apply unique constraint on generated table
    with op.batch_alter_table("comparison", naming_convention=convention) as batch_op:
        batch_op.create_unique_constraint("uq_comparison_uuid", ["uuid"])

    # step 4 create xapi_log table
    op.create_table(
        "xapi_log",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("statement", sa.Text(), nullable=True),
        sa.Column("modified_user_id", sa.Integer(), nullable=True),
        sa.Column("modified", sa.DateTime(), nullable=False),
        sa.Column("created_user_id", sa.Integer(), nullable=True),
        sa.Column("created", sa.DateTime(), nullable=False),
        sa.ForeignKeyConstraint(["created_user_id"], ["user.id"], ondelete="SET NULL"),
        sa.ForeignKeyConstraint(["modified_user_id"], ["user.id"], ondelete="SET NULL"),
        sa.PrimaryKeyConstraint("id"),
        mysql_charset="utf8",
        mysql_collate="utf8_unicode_ci",
        mysql_engine="InnoDB",
    )
开发者ID:ubc,项目名称:acj-versus,代码行数:34,代码来源:31fc9a032aa8_add_uuid_to_comparison_table.py


示例12: upgrade

def upgrade():
    op.create_table(
        u'client_authentication_mode',
        sa.Column(u'name', sa.String(10), primary_key=True),
    )

    # Create temporary table for table data seeding
    insert_table = sa.table(
        u'client_authentication_mode',
        sa.column(u'name', sa.String),
    )

    op.bulk_insert(
        insert_table,
        [
            {'name': constants.CLIENT_AUTH_NONE},
            {'name': constants.CLIENT_AUTH_OPTIONAL},
            {'name': constants.CLIENT_AUTH_MANDATORY}
        ]
    )

    op.add_column(
        u'listener',
        sa.Column(u'client_authentication', sa.String(10),
                  sa.ForeignKey('client_authentication_mode.name'),
                  server_default=constants.CLIENT_AUTH_NONE, nullable=False)
    )
开发者ID:openstack,项目名称:octavia,代码行数:27,代码来源:f21ae3f21adc_add_client_auth_option.py


示例13: upgrade

def upgrade():
    import os
    import hashlib
    HASH_SIZE = 32  # sha256 -> 32 bytes
    default_password = '123456'

    # current users are by default active
    op.add_column('users', sa.Column('_active', sa.Boolean(), nullable=True,
                                     server_default=sa.sql.expression.true()))

    # reate the new columns
    op.add_column('users', sa.Column('_salt', sa.Binary(length=HASH_SIZE), nullable=True))
    op.add_column('users', sa.Column('_hashed_password', sa.Binary(length=HASH_SIZE), nullable=True))

    # Store the salt and the hashed default password
    users = sa.table('users', sa.Column('id'),
                     sa.Column('_salt'),
                     sa.Column('_hashed_password'))
    connection = op.get_bind()
    for user in connection.execute(sa.select([sa.column('id')]).select_from(sa.text('users'))):
        if user:
            salt = os.urandom(HASH_SIZE)
            pwd = hashlib.sha256(salt + bytes(default_password, encoding='utf8')).digest()
            connection.execute(
                users.update().where(
                    users.c.id == user.id
                ).values(
                    _salt=salt,
                    _hashed_password=pwd,
                )
            )

    # alter columns to be not nullable
    op.alter_column('users', '_salt', nullable=False)
    op.alter_column('users', '_hashed_password', nullable=False)
开发者ID:serman,项目名称:autoconstruccion,代码行数:35,代码来源:575c91adb1b_20151216_adduser_authentication_salt_and_hashed_.py


示例14: test_legacy_typemap

    def test_legacy_typemap(self):
        table1 = table(
            "mytable",
            column("myid", Integer),
            column("name", String),
            column("description", String),
        )
        with testing.expect_deprecated(
            "The text.typemap parameter is deprecated"
        ):
            t = text(
                "select id, name from user",
                typemap=dict(id=Integer, name=String),
            )

        stmt = select([table1.c.myid]).select_from(
            table1.join(t, table1.c.myid == t.c.id)
        )
        compiled = stmt.compile()
        eq_(
            compiled._create_result_map(),
            {
                "myid": (
                    "myid",
                    (table1.c.myid, "myid", "myid"),
                    table1.c.myid.type,
                )
            },
        )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:29,代码来源:test_deprecations.py


示例15: upgrade

def upgrade():
    op.add_column('writeups', sa.Column('author', sa.Unicode(length=100), nullable=True))
    t_w = sa.table(
        'writeups',
        sa.column('id', sa.Integer),
        sa.column('author', sa.String),
    )
    t_wp = sa.table(
        'writeup_posts',
        sa.column('writeup_id', sa.Integer),
        sa.column('author', sa.String),
        sa.column('index', sa.Integer),
    )
    stmt = sa.select([t_wp.c.author]).where(sa.and_(t_wp.c.writeup_id == t_w.c.id, t_wp.c.index == 1))
    op.execute(t_w.update().values(author=stmt))
    op.alter_column('writeups', 'author', nullable=False)
开发者ID:inklesspen,项目名称:mimir,代码行数:16,代码来源:41d36feab50_add_author_to_writeup.py


示例16: test_insert_from_select_dont_mutate_raw_columns

    def test_insert_from_select_dont_mutate_raw_columns(self):
        # test [ticket:3603]
        from sqlalchemy import table

        table_ = table(
            "mytable",
            Column("foo", String),
            Column("bar", String, default="baz"),
        )

        stmt = select([table_.c.foo])
        insert = table_.insert().from_select(["foo"], stmt)

        self.assert_compile(stmt, "SELECT mytable.foo FROM mytable")
        self.assert_compile(
            insert,
            "INSERT INTO mytable (foo, bar) "
            "SELECT mytable.foo, :bar AS anon_1 FROM mytable",
        )
        self.assert_compile(stmt, "SELECT mytable.foo FROM mytable")
        self.assert_compile(
            insert,
            "INSERT INTO mytable (foo, bar) "
            "SELECT mytable.foo, :bar AS anon_1 FROM mytable",
        )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:25,代码来源:test_insert.py


示例17: test_unconsumed_names_kwargs

 def test_unconsumed_names_kwargs(self):
     t = table("t", column("x"), column("y"))
     assert_raises_message(
         exc.CompileError,
         "Unconsumed column names: z",
         t.insert().values(x=5, z=5).compile,
     )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:7,代码来源:test_insert.py


示例18: downgrade

def downgrade():
    # FIXME: this adds extraneous commas
    return
    log = sa.table('log', sa.column('type', sa.String), sa.column('msg', sa.String))
    rows = op.get_bind().execute(log.select().where(log.c.type == 'kick')).fetchall()
    values = [{'old_msg': x.msg, 'msg': x.msg.replace(' ', ',', 1)} for x in rows]
    op.get_bind().execute(log.update().where(log.c.msg == sa.bindparam('old_msg')).values(msg=sa.bindparam('msg')), values)
开发者ID:Polarcraft,项目名称:KbveBot,代码行数:7,代码来源:3614b38ddf9_kick_logging_change.py


示例19: data_to_db

def data_to_db():
    # Connect to DB
    db = sqlalchemy.create_engine('postgresql://user:[email protected]/mydatabase')
    engine = db.connect()  
    meta = sqlalchemy.MetaData(engine) 
    meta.reflect(bind=engine)

    # Create/Extend table
    try:
        table = sqlalchemy.Table("jsontable", 
                                 meta,               
                                 Column('idstr', Text, primary_key=True),
                                 Column('created_at', Text),
                                 Column('author', Text),
                                 Column('text', Text),
                                 Column('urls', Text),
                                 Column('platform', Text),
                                 extend_existing=True)
        table.create(engine) 
    except sqlalchemy.exc.ProgrammingError as e:
        print("Table already existis")                                              
        pass

    # Upsert entry
    record = sqlalchemy.table("jsontable",
                              Column('idstr', Text),
                              Column('created_at', Text),
                              Column('author', Text), 
                              Column('text', Text),
                              Column('urls', Text),
                              Column('platform', Text))
    records = 0
    consumer = KafkaConsumer('data', bootstrap_servers=['kafka:9092'])
    for msg in consumer:                                                
        msg = json.loads(msg.value.decode('utf-8'))                                        
        # Get rid of any non-ascii chars
        for k,v in msg.items():
            if isinstance(v, str):
                msg[k] = ''.join([i if ord(i) < 128 else ' ' for i in v])

            # Insert row if not already existing
            try:
                statement = record.insert().values(idstr = msg['idstr'],
                                                   created_at = msg['created_at'],
                                                   author = msg['author'],
                                                   text = msg['text'],
                                                   urls = msg['urls'],
                                                   platform = msg['platform'])

                engine.execute(statement)
                records += 1
                with open('test.txt','a') as f:
                    f.write(json.dumps(msg)+'\n')
                print(records)
            except sqlalchemy.exc.IntegrityError as e:                                       
                continue     


    return records
开发者ID:tamilyn,项目名称:quorum,代码行数:59,代码来源:store_data_fancy.py


示例20: upgrade

def upgrade():
    log = sa.table('log', sa.column('type', sa.String), sa.column('msg', sa.String))
    rows = op.get_bind().execute(log.select().where(log.c.type == 'kick').where(log.c.msg.like('%,%'))).fetchall()
    rows = [x for x in rows if ',' in x.msg and x.msg.find(',') < x.msg.find(' ')]
    if not rows:
        return
    values = [{'old_msg': x.msg, 'msg': x.msg.replace(',', ' ', 1)} for x in rows]
    op.get_bind().execute(log.update().where(log.c.msg == sa.bindparam('old_msg')).values(msg=sa.bindparam('msg')), values)
开发者ID:N6UDP,项目名称:cslbot,代码行数:8,代码来源:3614b38ddf9_kick_logging_change.py



注:本文中的sqlalchemy.table函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.tuple_函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.select函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap