• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python expression.select函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.sql.expression.select函数的典型用法代码示例。如果您正苦于以下问题:Python select函数的具体用法?Python select怎么用?Python select使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了select函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: upgrade

def upgrade(migrate_engine):
    if migrate_engine.name == 'sqlite':
        return
    meta = MetaData(bind=migrate_engine)
    for table_name, ref, child in TABLES:
        table = Table(table_name, meta, autoload=True)

        column_name, ref_table_name, ref_column_name = ref
        column = table.c[column_name]

        ref_table = Table(ref_table_name, meta, autoload=True)
        ref_column = ref_table.c[ref_column_name]

        subq = select([ref_column]).where(ref_column != None)

        if child:
            # Dump and cleanup rows in child table first
            child_table_name, child_column_name, child_ref_column_name = child

            child_table = Table(child_table_name, meta, autoload=True)
            child_column = child_table.c[child_column_name]

            child_ref_column = table.c[child_ref_column_name]

            child_subq = select([child_ref_column]).where(~ column.in_(subq))
            dump_cleanup_rows(migrate_engine, meta, child_table,
                              child_column.in_(child_subq))

        dump_cleanup_rows(migrate_engine, meta, table, ~ column.in_(subq))

        params = {'columns': [column], 'refcolumns': [ref_column]}
        if migrate_engine.name == 'mysql':
            params['name'] = "_".join(('fk', table_name, column_name))
        fkey = ForeignKeyConstraint(**params)
        fkey.create()
开发者ID:BATYD-Turksat,项目名称:nova_servo,代码行数:35,代码来源:209_add_missing_foreign_keys.py


示例2: test_in_25

 def test_in_25(self):
     self.assert_compile(
         select([self.table1.c.myid.in_(
                     select([self.table2.c.otherid]).as_scalar())]),
         "SELECT mytable.myid IN (SELECT myothertable.otherid "
             "FROM myothertable) AS anon_1 FROM mytable"
     )
开发者ID:e0ne,项目名称:sqlalchemy,代码行数:7,代码来源:test_operators.py


示例3: test_nested_in

 def test_nested_in(self):
     from sqlalchemy.sql.expression import select
     from sqlalchemy.schema import Table, MetaData, Column
     from sqlalchemy.types import Integer
     meta = MetaData()
     table = Table(
         u'Foo',
         meta,
         Column(u'a', Integer())
         )
     target = self._makeOne(
         select(
             [u'a'],
             from_obj=table,
             whereclause=table.c.a.in_(
                 self._makeOne(
                     select(
                         [u'a'],
                         from_obj=table,
                         ),
                     comment=u'inner'
                     )
                 )
             ),
         comment=u'test'
         )
     result = target.compile()
     self.assertEqual(unicode(result).replace(u'\n', ''), u'SELECT a FROM "Foo" WHERE "Foo".a IN (SELECT a FROM "Foo") /* test */')
开发者ID:moriyoshi,项目名称:sqlalchemy_querytagger,代码行数:28,代码来源:tests.py


示例4: test_migrate_batch_stureg

    def test_migrate_batch_stureg(self):
        batch_guid = '2bb942b9-75cf-4055-a67a-8b9ab53a9dfc'
        batch = {UdlStatsConstants.REC_ID: '6',
                 UdlStatsConstants.BATCH_GUID: batch_guid, UdlStatsConstants.TENANT: self.__tenant,
                 UdlStatsConstants.SCHEMA_NAME: None, Constants.DEACTIVATE: False,
                 UdlStatsConstants.LOAD_TYPE: LoadType.STUDENT_REGISTRATION,
                 UdlStatsConstants.BATCH_OPERATION: 's',
                 UdlStatsConstants.SNAPSHOT_CRITERIA: '{"reg_system_id": "015247bd-058c-48cd-bb4d-f6cffe5b40c1", "academic_year": 2015}'}
        self.insert_into_udl_stats(batch[UdlStatsConstants.REC_ID], batch_guid, self.__tenant, batch[UdlStatsConstants.LOAD_TYPE])

        preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
        count_to_source_query = select([func.count()]).select_from(preprod_conn.get_table(Constants.STUDENT_REG))
        count_to_be_inserted = preprod_conn.execute(count_to_source_query).fetchall()[0][0]
        self.assertEqual(10, count_to_be_inserted)

        prod_conn = EdMigrateDestConnection(tenant=get_unittest_preprod_tenant_name())
        student_reg_table = prod_conn.get_table(Constants.STUDENT_REG)
        count_query = select([func.count()]).select_from(student_reg_table)
        count_before = prod_conn.execute(count_query).fetchall()[0][0]
        self.assertEqual(2581, count_before)

        count_snapshot_query = select([func.count()], student_reg_table.c.academic_year == 2015).select_from(student_reg_table)
        count_to_be_deleted = prod_conn.execute(count_snapshot_query).fetchall()[0][0]
        self.assertEqual(1217, count_to_be_deleted)

        rtn = migrate_batch(batch)
        self.assertTrue(rtn)

        expected_count_after = count_before - count_to_be_deleted + count_to_be_inserted
        count_after = prod_conn.execute(count_query).fetchall()[0][0]
        self.assertEqual(expected_count_after, count_after)
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py


示例5: test_union

 def test_union(self):
     from sqlalchemy.sql.expression import select
     from sqlalchemy.schema import Table, MetaData, Column
     from sqlalchemy.types import Integer
     meta = MetaData()
     table = Table(
         u'Foo',
         meta,
         Column(u'a', Integer())
         )
     target = self._makeOne(
         select(
             [u'a'],
             from_obj=table,
             whereclause=(table.c.a == 1)
             ),
         comment=u'test'
         )
     target = target.union(
         select(
             [u'a'],
             from_obj=table,
             whereclause=(table.c.a == 2)
             )
         )
     result = target.compile()
     self.assertRegexpMatches(unicode(result).replace(u'\n', ''), ur'SELECT a FROM "Foo" WHERE "Foo".a = [^ ]+ UNION SELECT a FROM "Foo" WHERE "Foo".a = [^ ]+ /\* test \*/')
开发者ID:moriyoshi,项目名称:sqlalchemy_querytagger,代码行数:27,代码来源:tests.py


示例6: test_migrate_student_reg

    def test_migrate_student_reg(self):
        Unittest_with_edcore_sqlite.setUpClass(EdMigrateDestConnection.get_datasource_name(TestMigrate.test_tenant),
                                               use_metadata_from_db=False)
        preprod_conn = EdMigrateSourceConnection(tenant=get_unittest_preprod_tenant_name())
        prod_conn = EdMigrateDestConnection(tenant=get_unittest_prod_tenant_name())
        batch_guid = "0aa942b9-75cf-4055-a67a-8b9ab53a9dfc"
        student_reg_table = preprod_conn.get_table(Constants.STUDENT_REG)
        get_query = select([student_reg_table.c.student_reg_rec_id]).order_by(student_reg_table.c.student_reg_rec_id)
        count_query = select([func.count().label('student_reg_rec_ids')],
                             student_reg_table.c.student_reg_rec_id.in_(range(15541, 15551)))

        rset = preprod_conn.execute(get_query)
        row = rset.fetchall()
        self.assertEqual(10, len(row))
        self.assertListEqual([(15541,), (15542,), (15543,), (15544,), (15545,), (15546,), (15547,), (15548,), (15549,), (15550,)],
                             row)
        rset.close()

        rset = prod_conn.execute(count_query)
        row = rset.fetchone()
        self.assertEqual(0, row['student_reg_rec_ids'])
        rset.close()

        delete_count, insert_count = migrate_table(batch_guid, None, preprod_conn, prod_conn, 'student_reg', False)
        self.assertEqual(0, delete_count)
        self.assertEqual(10, insert_count)

        rset = prod_conn.execute(count_query)
        row = rset.fetchone()
        self.assertEqual(10, row['student_reg_rec_ids'])
        rset.close()
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:31,代码来源:test_migrate.py


示例7: create_expression

 def create_expression(self):
     if self.last_result is None:
         try:
             expr = select(self.table.columns).order_by(*[safe_collate(self.table.columns[nm], None) for nm in self.sort_key])
             if self.starter is not None:
                 expr = expr.where(and_(self.starter, self.filter))
             else:
                 expr = expr.where(self.filter)
         except:
             raise
     else:
         try:
             where_clause = vector_greater_than([self.table.columns[nm] for nm in self.sort_key], \
                                               [self.last_result[n] for n in self.sort_index])
             expr = (select(self.table.columns).order_by(*[safe_collate(self.table.columns[nm], None) for nm in self.sort_key]) \
                    .where(and_(where_clause, self.filter)))
         except:
             raise
     
     if self.limit_per is not None:
         expr = expr.limit(self.limit_per)
         
     if self.stream:
         expr = expr.execution_options(stream_results=True)
     
     return expr
开发者ID:jcrudy,项目名称:oreader,代码行数:26,代码来源:reader_configs.py


示例8: get_maps

def get_maps(document, lang):
    """Load and return maps that intersect with the document geometry.
    """
    if document.geometry is None:
        return []

    document_geom = select([DocumentGeometry.geom]). \
        where(DocumentGeometry.document_id == document.document_id)
    document_geom_detail = select([DocumentGeometry.geom_detail]). \
        where(DocumentGeometry.document_id == document.document_id)
    topo_maps = DBSession. \
        query(TopoMap). \
        filter(TopoMap.redirects_to.is_(None)). \
        join(
            DocumentGeometry,
            TopoMap.document_id == DocumentGeometry.document_id). \
        options(load_only(
            TopoMap.document_id, TopoMap.editor, TopoMap.code,
            TopoMap.version, TopoMap.protected)). \
        options(joinedload(TopoMap.locales).load_only(
            DocumentLocale.lang, DocumentLocale.title,
            DocumentLocale.version)). \
        filter(
            or_(
                DocumentGeometry.geom_detail.ST_Intersects(
                    document_geom.label('t1')),
                DocumentGeometry.geom_detail.ST_Intersects(
                    document_geom_detail.label('t2'))
            )). \
        all()

    if lang is not None:
        set_best_locale(topo_maps, lang)

    return topo_maps
开发者ID:arnaud-morvan,项目名称:v6_api,代码行数:35,代码来源:topo_map.py


示例9: _connect_ping_listener

def _connect_ping_listener(connection, branch):
    """Ping the server at connection startup.

    Ping the server at transaction begin and transparently reconnect
    if a disconnect exception occurs.
    """
    if branch:
        return

    # turn off "close with result".  This can also be accomplished
    # by branching the connection, however just setting the flag is
    # more performant and also doesn't get involved with some
    # connection-invalidation awkardness that occurs (see
    # https://bitbucket.org/zzzeek/sqlalchemy/issue/3215/)
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False
    try:
        # run a SELECT 1.   use a core select() so that
        # any details like that needed by Oracle, DB2 etc. are handled.
        connection.scalar(select([1]))
    except exception.DBConnectionError:
        # catch DBConnectionError, which is raised by the filter
        # system.
        # disconnect detected.  The connection is now
        # "invalid", but the pool should be ready to return
        # new connections assuming they are good now.
        # run the select again to re-validate the Connection.
        connection.scalar(select([1]))
    finally:
        connection.should_close_with_result = save_should_close_with_result
开发者ID:SvenDowideit,项目名称:clearlinux,代码行数:30,代码来源:session.py


示例10: query

    def query(self, parent_ids=None):
        """ Construct a SQL query for this level of the request. """
        if self.parent is None:
            q = select(from_obj=self.join(self.alias))
            q = q.offset(self.get_child_node_value('offset', 0))
            if not self.node.as_list:
                q = q.limit(1)
            else:
                q = q.limit(self.get_child_node_value('limit', 10))
        else:
            q = select(from_obj=self.join(self.parent.alias))
        q = self.filter(q)
        
        if parent_ids is not None:
            q = q.where(self.parent.alias.c.id.in_(parent_ids))
        
        q = self.project(q)
        q = q.distinct()
        #print self, type(self)
        #print q

        ids = []
        rp = db.session.execute(q)
        while True:
            row = rp.fetchone()
            if row is None:
                break
            row = dict(row.items())
            ids.append(row.get(self.pk_id))
            self.collect(row)

        for name, child in self.children.items():
            child.query(parent_ids=ids)
开发者ID:01-,项目名称:grano,代码行数:33,代码来源:__init__.py


示例11: get_asmt_rec_id

def get_asmt_rec_id(guid_batch, tenant_name, asmt_rec_id_info):
    '''
    Returns asmt_rec_id from dim_asmt table
    Steps:
    1. Get guid_asmt from integration table INT_SBAC_ASMT
    2. Select asmt_rec_id from dim_asmt by the same guid_amst got from 1. It should have 1 value
    '''
    source_table_name = asmt_rec_id_info['source_table']
    guid_column_name_in_source = asmt_rec_id_info['guid_column_in_source']
    target_table_name = asmt_rec_id_info['target_table']
    guid_column_name_in_target = asmt_rec_id_info['guid_column_name']
    rec_id_column_name = asmt_rec_id_info['rec_id']

    # connect to integration table, to get the value of guid_asmt
    with get_udl_connection() as udl_conn:
        int_table = udl_conn.get_table(source_table_name)
        query = select([int_table.c[guid_column_name_in_source]], from_obj=int_table, limit=1)
        query = query.where(int_table.c['guid_batch'] == guid_batch)
        results = udl_conn.get_result(query)
        if results:
            guid_column_value = results[0][guid_column_name_in_source]

    # connect to target table, to get the value of asmt_rec_id
    with get_target_connection(tenant_name, guid_batch) as target_conn:
        dim_asmt = target_conn.get_table(target_table_name)
        query = select([dim_asmt.c[rec_id_column_name]], from_obj=dim_asmt, limit=1)
        query = query.where(dim_asmt.c[guid_column_name_in_target] == guid_column_value)
        query = query.where(and_(dim_asmt.c['batch_guid'] == guid_batch))
        results = target_conn.get_result(query)
        if results:
            asmt_rec_id = results[0][rec_id_column_name]

    return asmt_rec_id
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:33,代码来源:move_to_target.py


示例12: generate_create_inheritance_view_statement

def generate_create_inheritance_view_statement(class_):
    viewname = cls2tbl(class_)[1:]
    tables = class_.__table__
    cols = {}
    def add_cols(table):
        for col in table.c:
            if col.name not in cols:
                cols[col.name] = col
    add_cols(class_.__table__)
    if class_.__score_db__['inheritance'] is not None:
        parent = class_.__score_db__['parent']
        while parent:
            table = parent.__table__
            tables = tables.join(
                table, onclause=table.c.id == class_.__table__.c.id)
            add_cols(table)
            parent = parent.__score_db__['parent']
    if class_.__score_db__['inheritance'] != 'single-table':
        viewselect = select(cols.values(), from_obj=tables)
    else:
        typecol = getattr(
            class_, class_.__score_db__['type_column'])
        typenames = []
        def add_typenames(cls):
            typenames.append(cls.__score_db__['type_name'])
            for subclass in cls.__subclasses__():
                add_typenames(subclass)
        add_typenames(class_)
        viewselect = select(cols.values(),
                            from_obj=class_.__table__,
                            whereclause=typecol.in_(typenames))
    return CreateView(viewname, viewselect)
开发者ID:score-framework,项目名称:py.db,代码行数:32,代码来源:_sa_stmt.py


示例13: set_leg_waypoints

def set_leg_waypoints():
    t = time.time()

    dd = db.metadata.tables["device_data"]
    legs = db.metadata.tables["legs"]
    glue = db.metadata.tables["leg_waypoints"]

    legpoints = select(
        [legs.c.id, dd.c.waypoint_id, dd.c.time, dd.c.snapping_time],
        from_obj=dd.join(legs, and_(
            dd.c.device_id == legs.c.device_id,
            dd.c.time.between(legs.c.time_start, legs.c.time_end)))) \
        .alias("legpoints")
    done = select([glue.c.leg], distinct=True)
    nounsnapped = select(
        [legpoints.c.id],
        legpoints.c.id.notin_(done),
        group_by=legpoints.c.id,
        having=func.bool_and(legpoints.c.snapping_time.isnot(None)))
    newitems = select(
        [legpoints.c.id, legpoints.c.waypoint_id, func.min(legpoints.c.time)],
        legpoints.c.id.in_(nounsnapped),
        group_by=[legpoints.c.id, legpoints.c.waypoint_id]).alias("newitems")

    ins = glue.insert().from_select(["leg", "waypoint", "first"], newitems)
    rowcount = db.engine.execute(ins).rowcount
    print("set_leg_waypoints on %d rows in %.2g seconds" % (
        rowcount, time.time() - t))
开发者ID:aalto-trafficsense,项目名称:regular-routes-server,代码行数:28,代码来源:scheduler.py


示例14: constructQuery

 def constructQuery(self, context):
     session= Session()
     trusted=removeSecurityProxy(context)
     user_id = getattr(trusted, self.value_field, None)
     if user_id:
         query = session.query(domain.User 
                ).filter(domain.User.user_id == 
                     user_id).order_by(domain.User.last_name,
                         domain.User.first_name,
                         domain.User.middle_name)
         return query
     else:
         sitting = trusted.__parent__
         group_id = sitting.group_id
         group_sitting_id = sitting.group_sitting_id
         all_member_ids = sql.select([schema.user_group_memberships.c.user_id], 
                 sql.and_(
                     schema.user_group_memberships.c.group_id == group_id,
                     schema.user_group_memberships.c.active_p == True))
         attended_ids = sql.select([schema.group_sitting_attendance.c.member_id],
                  schema.group_sitting_attendance.c.group_sitting_id == group_sitting_id)
         query = session.query(domain.User).filter(
             sql.and_(domain.User.user_id.in_(all_member_ids),
                 ~ domain.User.user_id.in_(attended_ids))).order_by(
                         domain.User.last_name,
                         domain.User.first_name,
                         domain.User.middle_name)
         return query
开发者ID:BenoitTalbot,项目名称:bungeni-portal,代码行数:28,代码来源:vocabulary.py


示例15: p_is_term

 def p_is_term(p):
     '''is_term : OP_IS string'''
     #TODO: implement starred, watched, owner, reviewer, draft
     username = p.parser.username
     if p[2] == 'reviewed':
         filters = []
         filters.append(gertty.db.approval_table.c.change_key == gertty.db.change_table.c.key)
         filters.append(gertty.db.approval_table.c.value != 0)
         s = select([gertty.db.change_table.c.key], correlate=False).where(and_(*filters))
         p[0] = gertty.db.change_table.c.key.in_(s)
     elif p[2] == 'open':
         p[0] = gertty.db.change_table.c.status.notin_(['MERGED', 'ABANDONED'])
     elif p[2] == 'closed':
         p[0] = gertty.db.change_table.c.status.in_(['MERGED', 'ABANDONED'])
     elif p[2] == 'submitted':
         p[0] = gertty.db.change_table.c.status == 'SUBMITTED'
     elif p[2] == 'merged':
         p[0] = gertty.db.change_table.c.status == 'MERGED'
     elif p[2] == 'abandoned':
         p[0] = gertty.db.change_table.c.status == 'ABANDONED'
     elif p[2] == 'owner':
         p[0] = and_(gertty.db.change_table.c.account_key == gertty.db.account_table.c.key,
                     gertty.db.account_table.c.username == username)
     elif p[2] == 'reviewer':
         filters = []
         filters.append(gertty.db.approval_table.c.change_key == gertty.db.change_table.c.key)
         filters.append(gertty.db.approval_table.c.account_key == gertty.db.account_table.c.key)
         filters.append(gertty.db.account_table.c.username == username)
         s = select([gertty.db.change_table.c.key], correlate=False).where(and_(*filters))
         p[0] = gertty.db.change_table.c.key.in_(s)
     else:
         raise gertty.search.SearchSyntaxError('Syntax error: has:%s is not supported' % p[2])
开发者ID:bradleyjones,项目名称:gertty,代码行数:32,代码来源:parser.py


示例16: test_within_distance

    def test_within_distance(self):
        """
        Because SDO_WITHIN_DISTANCE requires a spatial index for the geometry used
        as first parameter, we have to insert out test geometries into tables,
        unlike to the other databases.

        Note that Oracle uses meter as unit for the tolerance value for geodetic coordinate
        systems (like 4326)!
        """
        # test if SDO_functions._within_distance is called correctly
        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(0 0)', 0)).count(), 1)
        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(0 0)', 0.1)).count(), 1)
        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location, 'POINT(9 9)', 100000)).count(), 0)

        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
                                                       'Polygon((-5 -5, 5 -5, 5 5, -5 5, -5 -5))', 0)).count(), 3)

        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
                                                       'Polygon((-10 -10, 10 -10, 10 10, -10 10, -10 -10))', 0)).count(), 4)

        eq_(session.query(Spot).filter(functions._within_distance(Spot.spot_location,
                                                       'Polygon((-10 -10, 10 -10, 10 10, -10 10, -10 -10))', 200000)).count(), 5)

        # test if SDO_GEOM.functions._within_distance is called correctly
        eq_(session.scalar(select([text('1')], from_obj=['dual']).where(
                                                    functions._within_distance('POINT(0 0)', 'POINT(0 0)', 0,
                                                                    {'tol' : 0.00000005}))), 1)
        eq_(session.scalar(select([text('1')], from_obj=['dual']).where(
                                                    functions._within_distance('POINT(0 0)', 'POINT(0 0)', 0,
                                                                    {'dim1' : text(diminfo),
                                                                     'dim2' : text(diminfo)}))), 1)
开发者ID:ComedianClown,项目名称:geoalchemy,代码行数:31,代码来源:test_oracle.py


示例17: test_six_pt_five

    def test_six_pt_five(self):
        x = column("x")
        self.assert_compile(select([x]).where(or_(x == 7, true())),
                "SELECT x WHERE true")

        self.assert_compile(select([x]).where(or_(x == 7, true())),
                "SELECT x WHERE 1 = 1",
                dialect=default.DefaultDialect(supports_native_boolean=False))
开发者ID:chundi,项目名称:sqlalchemy,代码行数:8,代码来源:test_operators.py


示例18: load_to_table

def load_to_table(data_dict, guid_batch, int_table, tenant_name, udl_schema):
    '''
    Load the table into the proper table
    @param data_dict: the dictionary containing the data to be loaded
    @param guid_batch: the id for the batch
    @param int_table: the name of the integration table
    @param tenant_name: name of the tenant
    @param udl_schema: udl schema name
    '''
    # Create sqlalchemy connection and get table information from sqlalchemy
    ref_column_mapping_columns = {}
    with get_udl_connection() as conn:
        data_dict[mk.GUID_BATCH] = guid_batch
        data_dict = fix_empty_strings(data_dict)
        ref_table = conn.get_table('ref_column_mapping')
        s_int_table = conn.get_table(int_table)
        column_mapping_query = select([ref_table.c.target_column,
                                       ref_table.c.stored_proc_name],
                                      from_obj=ref_table).where(and_(ref_table.c.source_table == 'lz_json',
                                                                     ref_table.c.target_table == int_table))
        results = conn.get_result(column_mapping_query)
        for result in results:
            target_column = result['target_column']
            stored_proc_name = result['stored_proc_name']
            value = data_dict.get(target_column)
            if value:
                if stored_proc_name:
                    if stored_proc_name.startswith('sp_'):
                        ref_column_mapping_columns[target_column] = stored_proc_name + '(' + QuotedString(value if type(value) is str else str(value)).getquoted().decode('utf-8') + ')'
                    else:
                        format_value = dict()
                        format_value['value'] = QuotedString(value if type(value) is str
                                                             else str(value)).getquoted().decode('utf-8')
                        if s_int_table.c[target_column].type.python_type is str:
                            format_value['length'] = s_int_table.c[target_column].type.length
                        ref_column_mapping_columns[target_column] = stored_proc_name.format(**format_value)
                    continue
            ref_column_mapping_columns[target_column] = value

        record_sid = 'nextval(\'{schema_name}.{tenant_sequence_name}\')'.\
            format(schema_name=udl_schema, tenant_sequence_name=Constants.TENANT_SEQUENCE_NAME(tenant_name))
        from_select_column_names = ['record_sid']
        from_select_select_values = [record_sid]
        for column in s_int_table.c:
            value = data_dict.get(column.name)
            if value is not None:
                from_select_column_names.append(column.name)
                from_select_select_values.append(
                    ref_column_mapping_columns.get(column.name,
                                                   QuotedString(value if type(value) is str else str(value)).getquoted().decode('utf-8')))
        insert_into_int_table = s_int_table.insert().from_select(from_select_column_names,
                                                                 select(from_select_select_values))
        # create insert statement and execute
        affected_row = db_util.execute_udl_queries(conn, [insert_into_int_table],
                                                   'Exception in loading json data -- ',
                                                   'json_loader', 'load_to_table')

    return affected_row[0]
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:58,代码来源:json_loader.py


示例19: get_asmt_and_outcome_result

 def get_asmt_and_outcome_result(self, conf):
     with get_udl_connection() as conn:
         asmt_table = conn.get_table(conf.get(mk.ASMT_TABLE))
         asmt_outcome_table = conn.get_table(conf.get(mk.ASMT_OUTCOME_TABLE))
         asmt_result = conn.get_result(select([asmt_table.c.guid_asmt]).
                                       where(asmt_table.c.guid_batch == conf.get(mk.GUID_BATCH)))
         asmt_outcome_result = conn.get_result(select([asmt_outcome_table.c.assessmentguid], distinct=True).
                                               where(asmt_outcome_table.c.guid_batch == conf.get(mk.GUID_BATCH)))
     return asmt_result, asmt_outcome_result
开发者ID:SmarterApp,项目名称:RDW_DataWarehouse,代码行数:9,代码来源:content_validator.py


示例20: initialize_table_source

def initialize_table_source(db, tblname, schema=None, cols=None):

    tbl = initialize_table(db, tblname, schema)

    if not cols:
        return select([tbl])

    else:
        return select([tbl.c[col] for col in cols])
开发者ID:daterrell2,项目名称:datatools,代码行数:9,代码来源:database.py



注:本文中的sqlalchemy.sql.expression.select函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python expression.table函数代码示例发布时间:2022-05-27
下一篇:
Python expression.or_函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap