• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sql.text函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.text函数的典型用法代码示例。如果您正苦于以下问题:Python text函数的具体用法?Python text怎么用?Python text使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了text函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: upgrade

def upgrade():
    op.drop_constraint("queues_pkey", "queues")
    op.execute(CreateSequence(Sequence("queues_id_seq")))
    op.add_column("queues", Column("id", Integer, server_default=text("nextval('queues_id_seq'::regclass)")))
    op.create_primary_key("queues_pkey", "queues", ["id"])

    Email.__table__.create(bind=engine)
    op.create_table(
        "queue_notifications",
        Column("queue_id", Integer, ForeignKey("queues.id")),
        Column("email_id", Integer, ForeignKey("emails.id")),
    )
    op.add_column("users", Column("email_id", Integer, ForeignKey("emails.id")))

    conn = op.get_bind()
    s = select([text("users.email")]).select_from(text("users"))
    users = conn.execute(s)
    table_user = User.__table__
    for row in users:
        ins = Email.__table__.insert().values(address=row["email"])
        result_insert_email = conn.execute(ins)
        upd = (
            table_user.update()
            .values(email_id=result_insert_email.inserted_primary_key[0])
            .where(text("users.email = :email"))
        )
        conn.execute(upd, email=row["email"])

    op.drop_column("users", "email")
开发者ID:yaoweizhen,项目名称:pulseguardian,代码行数:29,代码来源:2d19bced283e_move_email_and_associate_to_queue.py


示例2: update_view_count_to_db

def update_view_count_to_db(url):
    try:
        return
        original_url = url
        count = redis_views.get(url)
        
        url = url.replace('http://d35wlof4jnjr70.cloudfront.net/', 'https://s3.amazonaws.com/franklymestorage/')
        types = ['_ultralow', '_low', '_medium', '_opt', '_promo']
        for suffix in types:
            url = url.replace(suffix, '')
        
        if count:
            db.session.execute(text("""UPDATE users 
                                        SET view_count=view_count+:count, total_view_count=total_view_count+:count
                                        WHERE profile_video=:url
                                    """),
                                params = {"url":url, "count":int(count)}
                            )
            db.session.execute(text("""UPDATE posts 
                                        SET view_count=view_count+:count 
                                        WHERE media_url=:url
                                    """),
                                params = {"url":url, "count":int(count)}
                            )
            db.session.commit()

        redis_views.delete(original_url)
    except:
        db.session.rollback()
开发者ID:Aranjedeath,项目名称:fbackend,代码行数:29,代码来源:video_db.py


示例3: get_candidate_tasks

def get_candidate_tasks(app_id, user_id=None, user_ip=None, n_answers=30, offset=0):
    """Gets all available tasks for a given project and user"""
    rows = None
    if user_id and not user_ip:
        query = text('''
                     SELECT id FROM task WHERE NOT EXISTS
                     (SELECT task_id FROM task_run WHERE
                     app_id=:app_id AND user_id=:user_id AND task_id=task.id)
                     AND app_id=:app_id AND state !='completed'
                     ORDER BY priority_0 DESC, id ASC LIMIT 10''')
        rows = session.execute(query, dict(app_id=app_id, user_id=user_id))
    else:
        if not user_ip:
            user_ip = '127.0.0.1'
        query = text('''
                     SELECT id FROM task WHERE NOT EXISTS
                     (SELECT task_id FROM task_run WHERE
                     app_id=:app_id AND user_ip=:user_ip AND task_id=task.id)
                     AND app_id=:app_id AND state !='completed'
                     ORDER BY priority_0 DESC, id ASC LIMIT 10''')
        rows = session.execute(query, dict(app_id=app_id, user_ip=user_ip))

    tasks = []
    for t in rows:
        tasks.append(session.query(Task).get(t.id))
    return tasks
开发者ID:alisheikh,项目名称:pybossa,代码行数:26,代码来源:sched.py


示例4: downgrade

def downgrade():
    conn = op.get_bind()
    conn.execute(
        text(
            "ALTER TABLE contact"
            " ADD CONSTRAINT contact_ibfk_1 FOREIGN KEY"
            " (namespace_id) REFERENCES namespace(id)"
        )
    )

    conn.execute(
        text(
            "ALTER TABLE phonenumber"
            " ADD CONSTRAINT phonenumber_ibfk_1 FOREIGN KEY"
            " (contact_id) REFERENCES contact(id)"
        )
    )

    conn.execute(
        text(
            "ALTER TABLE messagecontactassociation"
            " ADD CONSTRAINT messagecontactassociation_ibfk_1"
            " FOREIGN KEY (contact_id) REFERENCES contact(id)"
        )
    )
开发者ID:nylas,项目名称:sync-engine,代码行数:25,代码来源:231_drop_contact_foreign_keys.py


示例5: upgrade

def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.add_column('product', sa.Column('create_date', sa.DateTime(), nullable=True))

    results = op.get_bind().execute(text("""
    select prd.id, min(po.order_date) from purchase_order po, product prd, purchase_order_line pol
    where pol.product_id = prd.id and po.id = pol.purchase_order_id
    group by prd.id
    """)).fetchall()
    for r in results:
        sup_id = r[0]
        po_date = r[1]
        sql = "update product set create_date = '{0}' where id={1}".format(po_date, sup_id)
        op.get_bind().execute(text(sql))

    results = op.get_bind().execute(text("""
    select p.id, min(so.order_date) from sales_order so, sales_order_line sol,
    product p where so.id = sol.sales_order_id and
    sol.product_id = p.id group by p.id;
    """)).fetchall()
    for r in results:
        sup_id = r[0]
        so_date = r[1]
        sql = "update product set create_date = '{0}' where id={1} and create_date is null".format(so_date, sup_id)
        op.get_bind().execute(text(sql))

    op.get_bind().execute(text("update product set create_date = '{0}' where create_date is null".format(datetime.now())))
    op.alter_column('product', 'create_date', existing_type=sa.DateTime(), nullable=False)
开发者ID:betterlife,项目名称:psi,代码行数:28,代码来源:42_67607ed6ab04_.py


示例6: get_table_names

 def get_table_names(self, connection, schema=None, **kw):
     if schema is None:
         schema = self.default_schema_name
     if schema is None:
         result = connection.execute(
             text("SELECT TABLE_NAME FROM DB..TABLES"))
         return [r[0] for r in result]
     if '.' not in schema:
         schema += '.'
     catalog, schema = schema.split('.', 1)
     if catalog:
         if schema:
             result = connection.execute(
                 text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
                      "TABLE_CATALOG=:catalog AND TABLE_SCHEMA = :schema"),
                 catalog=catalog, schema=schema)
         else:
             result = connection.execute(
                 text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
                      "TABLE_CATALOG=:catalog"), catalog=catalog)
     else:
         result = connection.execute(
             text("SELECT TABLE_NAME FROM DB..TABLES WHERE"
                  "TABLE_SCHEMA=:schema"), schema=schema)
     return [r[0] for r in result]
开发者ID:philippeluickx,项目名称:virtuoso-python,代码行数:25,代码来源:alchemy.py


示例7: test_filtered_counting

    def test_filtered_counting( self ):
        user2 = self.user_manager.create( **user2_data )
        history = self.history_manager.create( name='history', user=user2 )
        contents = []
        contents.extend([ self.add_hda_to_history( history, name=( 'hda-' + str( x ) ) ) for x in xrange( 3 ) ])
        contents.append( self.add_list_collection_to_history( history, contents[:3] ) )
        contents.extend([ self.add_hda_to_history( history, name=( 'hda-' + str( x ) ) ) for x in xrange( 4, 6 ) ])
        contents.append( self.add_list_collection_to_history( history, contents[4:6] ) )

        self.log( "should show correct count with filters" )
        self.hda_manager.delete( contents[1] )
        self.hda_manager.delete( contents[4] )
        contents[6].deleted = True
        self.app.model.context.flush()

        contents[2].visible = False
        contents[5].visible = False
        contents[6].visible = False
        self.app.model.context.flush()

        HDA = self.hda_manager.model_class
        self.assertEqual( self.contents_manager.contents_count( history, filters=[ HDA.deleted == true() ] ), 3 )
        filters = [ text( 'visible = 0' ) ]
        self.assertEqual( self.contents_manager.contents_count( history, filters=filters ), 3 )

        filters = [ text( 'deleted = 1' ), text( 'visible = 0' ) ]
        self.assertEqual( self.contents_manager.contents_count( history, filters=filters ), 1 )
开发者ID:BenjaminHCCarr,项目名称:galaxy,代码行数:27,代码来源:test_HistoryContentsManager.py


示例8: test_mysql_innodb

    def test_mysql_innodb(self):
        """Test that table creation on mysql only builds InnoDB tables."""
        with mock.patch('manila.db.sqlalchemy.api.get_engine',
                        return_value=self.engine):
            self._walk_versions(snake_walk=False, downgrade=False)

        # sanity check
        sanity_check = """SELECT count(*)
                          FROM information_schema.tables
                          WHERE table_schema = :database;"""
        total = self.engine.execute(
            text(sanity_check),
            database=self.engine.url.database)

        self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")

        noninnodb_query = """
            SELECT count(*)
            FROM information_schema.TABLES
            WHERE table_schema = :database
                AND engine != 'InnoDB'
                AND table_name != 'alembic_version';"""

        count = self.engine.execute(
            text(noninnodb_query),
            database=self.engine.url.database
        ).scalar()
        self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
开发者ID:jcsp,项目名称:manila,代码行数:28,代码来源:test_migration.py


示例9: has_table

 def has_table(self, connection, table_name, schema=None):
     # seems like case gets folded in pg_class...
     if schema is None:
         cursor = connection.execute(
             sql.text(
             "select relname from pg_class c join pg_namespace n on "
             "n.oid=c.relnamespace where n.nspname=current_schema() and "
             "lower(relname)=:name",
             bindparams=[
                     sql.bindparam('name', unicode(table_name.lower()),
                     type_=sqltypes.Unicode)]
             )
         )
     else:
         cursor = connection.execute(
             sql.text(
             "select relname from pg_class c join pg_namespace n on "
             "n.oid=c.relnamespace where n.nspname=:schema and "
             "lower(relname)=:name",
                 bindparams=[
                     sql.bindparam('name', 
                     unicode(table_name.lower()), type_=sqltypes.Unicode),
                     sql.bindparam('schema', 
                     unicode(schema), type_=sqltypes.Unicode)] 
             )
         )
     return bool(cursor.first())
开发者ID:advatar,项目名称:thevault,代码行数:27,代码来源:base.py


示例10: createServiceRequest

def createServiceRequest():
	if request.method == 'POST':
		sql = text('''INSERT INTO service_request(client_username, address, title, description, tag)
						VALUES(:user, :address, :title, :description, :tag);''')
		db.engine.execute(sql, user=session["user"], address=request.form.get('address'), title=request.form.get('title'),\
							description=request.form.get('description'), tag=request.form.get('tag'))
		sql2 = text('''SELECT service_id FROM service_request WHERE client_username=:user AND address=:address AND
						title=:title AND description=:description;''')
		result = db.engine.execute(sql2, user=session["user"], address=request.form.get('address'), title=request.form.get('title'),\
							description=request.form.get('description'))
		idnum = result.fetchone()
		days = ['mo', 'tu', 'we', 'th', 'fr', 'sa', 'su']
		for day in days:
			times = request.form.getlist(day)
			for time in times:
				sql = text('''INSERT INTO service_schedule(service_id,day,hour) VALUES(:id,:day,:hour);''')
				db.engine.execute(sql, id=idnum[0], day=day, hour=time)
		flash('Service request created.')
		return redirect('/requests/'+str(idnum[0])+'/workers')
	if not session.get('user'):
		flash('Please login before creating requests.')
		return redirect('/')
	elif session['type'] == 'worker':
		flash('You must be a client to create service requests.')
		return redirect('/')
	else:
		sql = text('''SELECT tag FROM job_tag''')
		results = db.engine.execute(sql)
		tags = [res[0] for res in results]
		return render_template('createRequest.jade', tags=tags)
开发者ID:mibewh,项目名称:helpingHand,代码行数:30,代码来源:request.py


示例11: getAltitudesChilds

def getAltitudesChilds(connection, cd_ref):
    # construction du select  de la requete a partir des cles de la table
    sql = """
        SELECT label_altitude
        FROM atlas.bib_altitudes
        ORDER BY altitude_min
    """
    qalt = connection.execute(text(sql))
    alt = [k[0] for k in qalt]

    sumSelect = ', '.join(
        "SUM({}) AS {}".format(k, k) for k in alt
    )

    sql = """
        SELECT {sumSelect}
        FROM atlas.vm_altitudes alt
        WHERE
            alt.cd_ref IN (
                SELECT * FROM atlas.find_all_taxons_childs(:thiscdref)
            ) OR alt.cd_ref = :thiscdref
    """.format(sumSelect=sumSelect)
    mesAltitudes = connection.execute(text(sql), thiscdref=cd_ref)

    altiList = list()
    for a in mesAltitudes:
        for k in alt:
            temp = {"altitude": k.replace('_', '-')[1:], "value": getattr(a, k)}
            altiList.append(temp)

    return altiList
开发者ID:PnEcrins,项目名称:GeoNature-atlas,代码行数:31,代码来源:vmAltitudesRepository.py


示例12: get_unique_constraints

    def get_unique_constraints(self, connection, table_name, schema=None, **kw):
        # Same as get_indexes except only for "unique"=2
        table_id = self.get_table_id(connection, table_name, schema,
                                     info_cache=kw.get("info_cache"))

        # unique=2 -> unique constraint
        INDEX_SQL = text("""
          SELECT i.index_id as index_id, i.index_name AS name
          FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
          WHERE t.table_id = :table_id and i.index_category = 3 and i."unique"=2
        """)

        results = connection.execute(INDEX_SQL, table_id=table_id)
        indexes = []
        for r in results:
            INDEXCOL_SQL = text("""
             select tc.column_name as col
             FROM sys.sysidxcol ic
             join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
             WHERE ic.index_id = :index_id and ic.table_id = :table_id
             ORDER BY ic.sequence ASC
            """)
            idx_cols = connection.execute(INDEXCOL_SQL, index_id=r["index_id"],
                                          table_id=table_id)
            column_names = [ic["col"] for ic in idx_cols]
            index_info = {"name": r["name"],
                          "column_names": column_names}
            indexes.append(index_info)

        return indexes       
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:30,代码来源:base.py


示例13: get_pk_constraint

    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
        table_id = self.get_table_id(connection, table_name, schema,
                                     info_cache=kw.get("info_cache"))

        # index_category=1 -> primary key
        PK_SQL = text("""
          SELECT t.table_name AS table_name, i.index_id as index_id,
                 i.index_name AS name
          FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
          WHERE t.table_id = :table_id and i.index_category = 1
        """)

        results = connection.execute(PK_SQL, table_id=table_id)
        pks = results.fetchone()
        results.close()

        if not pks:
            return {"constrained_columns": [],
                    "name": None}

        PKCOL_SQL = text("""
             select tc.column_name as col
             FROM sys.sysidxcol ic
             join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
             WHERE ic.index_id = :index_id and ic.table_id = :table_id
            """)
        pk_cols = connection.execute(PKCOL_SQL, index_id=pks["index_id"],
                                     table_id=table_id )
        column_names = [pkc["col"] for pkc in pk_cols]
        return {"constrained_columns": column_names,
                "name": pks["name"]}
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:31,代码来源:base.py


示例14: get_indexes

    def get_indexes(self, connection, table_name, schema=None, **kw):
        table_id = self.get_table_id(connection, table_name, schema,
                                     info_cache=kw.get("info_cache"))

        # index_category=3 -> not primary key, not foreign key, not text index
        # unique=1 -> unique index, 2 -> unique constraint, 5->unique index with
        # nulls not distinct
        INDEX_SQL = text("""
          SELECT i.index_id as index_id, i.index_name AS name,
                 if i."unique" in (1,2,5) then 1 else 0 endif AS "unique"
          FROM sys.sysidx i join sys.systab t on i.table_id=t.table_id
          WHERE t.table_id = :table_id and i.index_category = 3
        """)

        results = connection.execute(INDEX_SQL, table_id=table_id)
        indexes = []
        for r in results:
            INDEXCOL_SQL = text("""
             select tc.column_name as col
             FROM sys.sysidxcol ic
             join sys.systabcol tc on (ic.table_id=tc.table_id and ic.column_id=tc.column_id)
             WHERE ic.index_id = :index_id and ic.table_id = :table_id
             ORDER BY ic.sequence ASC
            """)
            idx_cols = connection.execute(INDEXCOL_SQL, index_id=r["index_id"],
                                          table_id=table_id)
            column_names = [ic["col"] for ic in idx_cols]
            index_info = {"name": r["name"],
                          "unique": bool(r["unique"]),
                          "column_names": column_names}
            indexes.append(index_info)

        return indexes
开发者ID:sqlanywhere,项目名称:sqlalchemy-sqlany,代码行数:33,代码来源:base.py


示例15: upgrade_6_0_to_6_1_plugins_cluster_attrs_use_ids_mapping

def upgrade_6_0_to_6_1_plugins_cluster_attrs_use_ids_mapping(connection):
    """Convert plugin version mapping to plugin ids

    In Fuel 6.0 we had plugin version in cluster attributes
    to identify which plugin should be enabled or disabled.
    In 6.1 release we have plugins updates feature, it means
    that a single plugin can be updated/overwritten with newer
    version. For example 1.0.0 can be replaced with 1.0.1.
    As result we cannot rely on versions anymore, here we
    convert version mapping to plugin ids.

    See blueprint:
    https://blueprints.launchpad.net/fuel/+spec/plugins-security-fixes-delivery
    """
    select_attrs = text("""SELECT id, editable FROM attributes""")

    select_plugins = text(
        """SELECT id FROM plugins
        WHERE name = :plugin_name AND
        version = :plugin_version""")

    update_attrs = text(
        """UPDATE attributes
        SET editable = :editable
        WHERE id = :id""")

    attrs_list = connection.execute(select_attrs)
    for raw_attrs in attrs_list:
        attr_id = raw_attrs[0]
        attrs = jsonutils.loads(raw_attrs[1])

        for key, attr in six.iteritems(attrs):
            metadata = attr.get('metadata', {})
            plugin_version = metadata.get('plugin_version')
            if not plugin_version:
                continue

            plugin_name = key

            # If there is no plugin with such version
            # and name, it means that something was wrong
            # and somebody deleted the plugin from database
            # we must not fail migration in this case
            plugin_id = None

            plugins = list(connection.execute(
                select_plugins,
                plugin_name=plugin_name,
                plugin_version=plugin_version))

            if plugins:
                plugin_id = plugins[0][0]

            del attr['metadata']['plugin_version']
            attr['metadata']['plugin_id'] = plugin_id

        connection.execute(
            update_attrs,
            editable=jsonutils.dumps(attrs),
            id=attr_id)
开发者ID:katepimenova,项目名称:fuel-web,代码行数:60,代码来源:migration.py


示例16: has_sequence

    def has_sequence(self, connection, sequence_name, schema=None):
        if schema is None:
            cursor = connection.execute(
                sql.text(
                    "SELECT relname FROM pg_class c join pg_namespace n on "
                    "n.oid=c.relnamespace where relkind='S' and "
                    "n.nspname=current_schema() "
                    "and lower(relname)=:name",
                    bindparams=[
                        sql.bindparam('name', unicode(sequence_name.lower()),
                        type_=sqltypes.Unicode)
                    ] 
                )
            )
        else:
            cursor = connection.execute(
                sql.text(
                "SELECT relname FROM pg_class c join pg_namespace n on "
                "n.oid=c.relnamespace where relkind='S' and "
                "n.nspname=:schema and lower(relname)=:name",
                bindparams=[
                    sql.bindparam('name', unicode(sequence_name.lower()),
                     type_=sqltypes.Unicode),
                    sql.bindparam('schema', 
                                unicode(schema), type_=sqltypes.Unicode)
                ]
            )
            )

        return bool(cursor.first())
开发者ID:advatar,项目名称:thevault,代码行数:30,代码来源:base.py


示例17: upgrade_ubuntu_cobbler_profile_6_0_to_6_1

def upgrade_ubuntu_cobbler_profile_6_0_to_6_1(connection):
    select_query = text("SELECT id, generated FROM attributes")
    update_query = text(
        "UPDATE attributes SET generated = :generated WHERE id = :attr_id")
    for attr_id, generated in connection.execute(select_query):
        attrs = jsonutils.loads(generated)
        if attrs['cobbler']['profile'] == 'ubuntu_1204_x86_64':
            attrs['cobbler']['profile'] = 'ubuntu_1404_x86_64'
            connection.execute(
                update_query,
                generated=jsonutils.dumps(attrs),
                attr_id=attr_id)

    select_query = text("SELECT id, attributes_metadata FROM releases")
    update_query = text(
        "UPDATE releases SET attributes_metadata = :attrs_meta"
        " WHERE id = :release_id")
    for release_id, attributes_metadata in connection.execute(select_query):
        attrs = jsonutils.loads(attributes_metadata)
        if attrs['generated']['cobbler']['profile']['generator_arg'] == \
                'ubuntu_1204_x86_64':
            attrs['generated']['cobbler']['profile']['generator_arg'] = \
                'ubuntu_1404_x86_64'
            connection.execute(
                update_query,
                attrs_meta=jsonutils.dumps(attrs),
                release_id=release_id)
开发者ID:katepimenova,项目名称:fuel-web,代码行数:27,代码来源:migration.py


示例18: sources2removals

def sources2removals(source_list, suite_id, session):
    """Compute removals items given a list of names of source packages

    @type source_list: list
    @param source_list: A list of names of source packages

    @type suite_id: int
    @param suite_id: The id of the suite from which these sources should be removed

    @type session: SQLA Session
    @param session: The database session in use

    @rtype: list
    @return: A list of items to be removed to remove all sources and their binaries from the given suite
    """
    to_remove = []
    params = {"suite_id": suite_id, "sources": tuple(source_list)}
    q = session.execute(sql.text("""
                    SELECT s.source, s.version, 'source', s.id
                    FROM source s
                         JOIN src_associations sa ON sa.source = s.id
                    WHERE sa.suite = :suite_id AND s.source IN :sources"""), params)
    to_remove.extend(q)
    q = session.execute(sql.text("""
                    SELECT b.package, b.version, a.arch_string, b.id
                    FROM binaries b
                         JOIN bin_associations ba ON b.id = ba.bin
                         JOIN architecture a ON b.architecture = a.id
                         JOIN source s ON b.source = s.id
                    WHERE ba.suite = :suite_id AND s.source IN :sources"""), params)
    to_remove.extend(q)
    return to_remove
开发者ID:Debian,项目名称:dak,代码行数:32,代码来源:auto_decruft.py


示例19: submit_recording

def submit_recording(connection, data):
    data_json = json.dumps(data, sort_keys=True, separators=(',', ':'))
    data_sha256 = sha256(data_json.encode("utf-8")).hexdigest()

    meta = {"artist": data["artist"], "title": data["title"]}
    meta_json = json.dumps(meta, sort_keys=True, separators=(',', ':'))
    meta_sha256 = sha256(meta_json.encode("utf-8")).hexdigest()

    artist = get_artist_credit(connection, data["artist"])
    if not artist:
        artist = add_artist_credit(connection, data["artist"])
    if "release" in data:
        release = get_release(connection, data["release"])
        if not release:
            release = add_release(connection, data["release"])
    else:
        release = None
    query = text("""INSERT INTO recording_json (data, data_sha256, meta_sha256)
                   VALUES (:data, :data_sha256, :meta_sha256)
                RETURNING id""")
    result = connection.execute(query, {"data": data_json,
                                           "data_sha256": data_sha256,
                                           "meta_sha256": meta_sha256})
    id = result.fetchone()["id"]
    gid = str(uuid.uuid4())
    query = text("""INSERT INTO recording (gid, data, artist, release, submitted)
                    VALUES (:gid, :data, :artist, :release, now())""")
    connection.execute(query, {"gid": gid,
                                  "data": id,
                                  "artist": artist,
                                  "release": release})

    return gid
开发者ID:Freso,项目名称:messybrainz-server,代码行数:33,代码来源:data.py


示例20: main

def main(url = Config.get("URL", "ships"), debug=False):
    req = urllib2.Request(url)
    req.add_header('User-Agent', useragent)
    stats = urllib2.urlopen(req).read()
    session.execute(Ship.__table__.delete())
    if Config.get("DB", "dbms") == "mysql":
        session.execute(text("ALTER TABLE ships AUTO_INCREMENT=1;", bindparams=[false]))
    else:
        session.execute(text("SELECT setval('ships_id_seq', 1, :false);", bindparams=[false]))
    
    for line in sre.findall(stats):
        ship = Ship()
        line = list(line)
        for index, key in enumerate(keys):
            if line[index] in mapping:
                line[index] = mapping[line[index]]
            elif line[index].isdigit():
                line[index] = int(line[index])
            if line[index] not in ('-', '',):
                setattr(ship,key,line[index])
        ship.total_cost = ship.metal + ship.crystal + ship.eonium
        if debug: print "%12s%12s%12s%12s" % (ship.name, ship.class_, ship.race, ship.type,)
        
        session.add(ship)
    
    session.commit()
    session.close()
开发者ID:TBBTNut,项目名称:merlin,代码行数:27,代码来源:shipstats.py



注:本文中的sqlalchemy.sql.text函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sql.true函数代码示例发布时间:2022-05-27
下一篇:
Python sql.table函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap