• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python sqlalchemy.literal_column函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.literal_column函数的典型用法代码示例。如果您正苦于以下问题:Python literal_column函数的具体用法?Python literal_column怎么用?Python literal_column使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了literal_column函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_group_concat_sqlite_one_arg

def test_group_concat_sqlite_one_arg(db_session):
    """
    It should use SQLite's deafult arguments (comma delimiter)
    """
    from sqlalchemy import literal_column
    from occams_datastore.utils.sql import group_concat

    if db_session.bind.url.drivername != 'sqlite':
        pytest.skip('Not using SQLite')

    data = (
        db_session.query(
            literal_column("'myitem'").label('name'),
            literal_column("'foo'").label('value'))
        .union(
            db_session.query(
                literal_column("'myitem'").label('name'),
                literal_column("'bar'").label('value')))
        .subquery())

    query = (
        db_session.query(group_concat(data.c.value))
        .select_from(data)
        .group_by(data.c.name))

    result, = query.one()
    assert sorted(['foo', 'bar']) == sorted(result.split(','))
开发者ID:jkrooskos,项目名称:occams_datastore,代码行数:27,代码来源:test_sql.py


示例2: test_tuple_containment

    def test_tuple_containment(self):

        for test, exp in [
            ([("a", "b")], True),
            ([("a", "c")], False),
            ([("f", "q"), ("a", "b")], True),
            ([("f", "q"), ("a", "c")], False),
        ]:
            eq_(
                testing.db.execute(
                    select(
                        [
                            tuple_(
                                literal_column("'a'"), literal_column("'b'")
                            ).in_(
                                [
                                    tuple_(
                                        *[
                                            literal_column("'%s'" % letter)
                                            for letter in elem
                                        ]
                                    )
                                    for elem in test
                                ]
                            )
                        ]
                    )
                ).scalar(),
                exp,
            )
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:30,代码来源:test_query.py


示例3: execute

    def execute(self, connection, filter_values):
        max_date_query = sqlalchemy.select([
            sqlalchemy.func.max(sqlalchemy.column('completed_on')).label('completed_on'),
            sqlalchemy.column('case_id').label('case_id')
        ]).select_from(sqlalchemy.table(self.table_name))

        if self.filters:
            for filter in self.filters:
                max_date_query.append_whereclause(filter.build_expression())

        max_date_query.append_group_by(
            sqlalchemy.column('case_id')
        )

        max_date_subquery = sqlalchemy.alias(max_date_query, 'max_date')

        asha_table = self.get_asha_table_name()
        checklist_query = sqlalchemy.select()
        for column in self.columns:
            checklist_query.append_column(column.build_column())

        checklist_query = checklist_query.where(
            sqlalchemy.literal_column('"{}".case_id'.format(asha_table)) == max_date_subquery.c.case_id
        ).where(
            sqlalchemy.literal_column('"{}".completed_on'.format(asha_table)) == max_date_subquery.c.completed_on
        ).select_from(sqlalchemy.table(asha_table))

        return connection.execute(checklist_query, **filter_values).fetchall()
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:28,代码来源:sql_data.py


示例4: test_row_case_sensitive_unoptimized

    def test_row_case_sensitive_unoptimized(self):
        ins_db = engines.testing_engine(options={"case_sensitive": True})
        row = ins_db.execute(
            select([
                literal_column("1").label("case_insensitive"),
                literal_column("2").label("CaseSensitive"),
                text("3 AS screw_up_the_cols")
            ])
        ).first()

        eq_(
            list(row.keys()),
            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"])

        in_("case_insensitive", row._keymap)
        in_("CaseSensitive", row._keymap)
        not_in_("casesensitive", row._keymap)

        eq_(row["case_insensitive"], 1)
        eq_(row["CaseSensitive"], 2)
        eq_(row["screw_up_the_cols"], 3)

        assert_raises(KeyError, lambda: row["Case_insensitive"])
        assert_raises(KeyError, lambda: row["casesensitive"])
        assert_raises(KeyError, lambda: row["screw_UP_the_cols"])
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:25,代码来源:test_resultset.py


示例5: test_group_concat_postgresql_invalid_args

def test_group_concat_postgresql_invalid_args(db_session):
    """
    It should only support at least two arguments in PostgreSQL
    """
    from sqlalchemy import literal_column
    from occams_datastore.utils.sql import group_concat

    if db_session.bind.url.drivername != 'postgresql':
        pytest.skip('Not using PostgreSQL')

    data = (
        db_session.query(
            literal_column("'myitem'").label('name'),
            literal_column("'foo'").label('value'))
        .union(
            db_session.query(
                literal_column("'myitem'").label('name'),
                literal_column("'bar'").label('value')))
        .subquery())

    query = (
        db_session.query(group_concat(data.c.value))
        .select_from(data)
        .group_by(data.c.name))

    with pytest.raises(TypeError):
        result, = query.one()
开发者ID:jkrooskos,项目名称:occams_datastore,代码行数:27,代码来源:test_sql.py


示例6: test_tuple_containment

    def test_tuple_containment(self):

        for test, exp in [
            ([('a', 'b')], True),
            ([('a', 'c')], False),
            ([('f', 'q'), ('a', 'b')], True),
            ([('f', 'q'), ('a', 'c')], False)
        ]:
            eq_(
                testing.db.execute(
                    select([
                        tuple_(
                            literal_column("'a'"),
                            literal_column("'b'")
                        ).
                        in_([
                            tuple_(*[
                                literal_column("'%s'" % letter)
                                for letter in elem
                            ]) for elem in test
                        ])
                    ])
                ).scalar(),
                exp
            )
开发者ID:anti-social,项目名称:sqlalchemy,代码行数:25,代码来源:test_query.py


示例7: test_row_case_sensitive

    def test_row_case_sensitive(self):
        row = testing.db.execute(
            select([
                literal_column("1").label("case_insensitive"),
                literal_column("2").label("CaseSensitive")
            ])
        ).first()

        eq_(list(row.keys()), ["case_insensitive", "CaseSensitive"])

        in_("case_insensitive", row._keymap)
        in_("CaseSensitive", row._keymap)
        not_in_("casesensitive", row._keymap)

        eq_(row["case_insensitive"], 1)
        eq_(row["CaseSensitive"], 2)

        assert_raises(
            KeyError,
            lambda: row["Case_insensitive"]
        )
        assert_raises(
            KeyError,
            lambda: row["casesensitive"]
        )
开发者ID:mattastica,项目名称:sqlalchemy,代码行数:25,代码来源:test_resultset.py


示例8: test_percent_sign_round_trip

    def test_percent_sign_round_trip(self):
        """test that the DBAPI accommodates for escaped / nonescaped
        percent signs in a way that matches the compiler

        """
        m = self.metadata
        t = Table('t', m, Column('data', String(50)))
        t.create(config.db)
        with config.db.begin() as conn:
            conn.execute(t.insert(), dict(data="some % value"))
            conn.execute(t.insert(), dict(data="some %% other value"))

            eq_(
                conn.scalar(
                    select([t.c.data]).where(
                        t.c.data == literal_column("'some % value'"))
                ),
                "some % value"
            )

            eq_(
                conn.scalar(
                    select([t.c.data]).where(
                        t.c.data == literal_column("'some %% other value'"))
                ), "some %% other value"
            )
开发者ID:ArthurGarnier,项目名称:SickRage,代码行数:26,代码来源:test_dialect.py


示例9: test_text_doesnt_explode

    def test_text_doesnt_explode(self):

        for s in [
            select(
                [
                    case(
                        [
                            (
                                info_table.c.info == 'pk_4_data',
                                text("'yes'"))],
                        else_=text("'no'"))
                ]).order_by(info_table.c.info),

            select(
                [
                    case(
                        [
                            (
                                info_table.c.info == 'pk_4_data',
                                literal_column("'yes'"))],
                        else_=literal_column("'no'")
                    )]
            ).order_by(info_table.c.info),

        ]:
            if testing.against("firebird"):
                eq_(s.execute().fetchall(), [
                    ('no ', ), ('no ', ), ('no ', ), ('yes', ),
                    ('no ', ), ('no ', ),
                ])
            else:
                eq_(s.execute().fetchall(), [
                    ('no', ), ('no', ), ('no', ), ('yes', ),
                    ('no', ), ('no', ),
                ])
开发者ID:Attsun1031,项目名称:sqlalchemy,代码行数:35,代码来源:test_case_statement.py


示例10: test_select_composition_seven

 def test_select_composition_seven(self):
     self.assert_compile(
         select([
             literal_column('col1'),
             literal_column('col2')
         ], from_obj=table('tablename')).alias('myalias'),
         "SELECT col1, col2 FROM tablename"
     )
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:8,代码来源:test_text.py


示例11: test_select_composition_seven

 def test_select_composition_seven(self):
     self.assert_compile(
         select(
             [literal_column("col1"), literal_column("col2")],
             from_obj=table("tablename"),
         ).alias("myalias"),
         "SELECT col1, col2 FROM tablename",
     )
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:8,代码来源:test_text.py


示例12: on_insert_also

 def on_insert_also(self):
     Foo2 = sql.t.Foo2
     n_column = sqlalchemy.literal_column('1').label('n')
     return (Foo2.insert().values(n=sqlalchemy.literal_column('new.id'),
                                  foo=sqlalchemy.literal_column('new.description')),
             sql.InsertFromSelect(Foo2, sqlalchemy.select([n_column])),
             "select 42",
             )
开发者ID:nicLucian,项目名称:pytis,代码行数:8,代码来源:demo.py


示例13: _update_current_rev

 def _update_current_rev(self, old, new):
     if old == new:
         return
     if new is None:
         self.impl._exec(self._version.delete())
     elif old is None:
         self.impl._exec(self._version.insert().values(version_num=literal_column("'%s'" % new)))
     else:
         self.impl._exec(self._version.update().values(version_num=literal_column("'%s'" % new)))
开发者ID:leslyKay,项目名称:me,代码行数:9,代码来源:migration.py


示例14: data

    def data(self,
             use_choice_labels=False,
             expand_collections=False,
             ignore_private=True):
        session = self.db_session
        query = (
            session.query(
                models.Patient.id.label('id'),
                models.Site.name.label('site'),
                models.Patient.pid.label('pid'))
            .join(models.Site))

        # BBB 2014-02-20 (Marco): AEH needs Early Test
        EarlyTest = aliased(models.Enrollment)
        subquery = (
            session.query(EarlyTest.patient_id, EarlyTest.reference_number)
            .filter(EarlyTest.study.has(
                models.Study.code.in_([literal_column("'ET'"),
                                       literal_column("'LTW'"),
                                       literal_column("'CVCT'")])))
            .subquery())
        query = (
            query
            .outerjoin(subquery, subquery.c.patient_id == models.Patient.id)
            .add_column(subquery.c.reference_number.label('early_id')))

        # Add every known reference number
        for reftype in self.reftypes:
            query = query.add_column(
                session.query(
                    group_concat(
                        models.PatientReference.reference_number, ';'))
                .filter(
                    models.PatientReference.patient_id == models.Patient.id)
                .filter(
                    models.PatientReference.reference_type_id == reftype.id)
                .group_by(models.PatientReference.patient_id)
                .correlate(models.Patient)
                .as_scalar()
                .label(reftype.name))

        CreateUser = aliased(datastore.User)
        ModifyUser = aliased(datastore.User)

        query = (
            query
            .join(CreateUser, models.Patient.create_user)
            .join(ModifyUser, models.Patient.modify_user)
            .add_columns(
                models.Patient.create_date,
                CreateUser.key.label('create_user'),
                models.Patient.modify_date,
                ModifyUser.key.label('modify_user'))
            .order_by(models.Patient.id))

        return query
开发者ID:davidmote,项目名称:occams_studies,代码行数:56,代码来源:pid.py


示例15: search_query

def search_query(cls, tokens, weight_func=None, include_misses=False, ordered=True):

    # Read the searchable columns from the table (strings)
    columns = cls.__searchable_columns__

    # Convert the columns from strings into column objects
    columns = [getattr(cls, c) for c in columns]

    # The model name that can be used to match search result to model
    cls_name = literal_column("'{}'".format(cls.__name__))

    # Filter out id: tokens for later
    ids, tokens = process_id_option(tokens)

    # If there are still tokens left after id: token filtering
    if tokens:
        # Generate the search weight expression from the
        # searchable columns, tokens and patterns
        if not weight_func:
            weight_func = weight_expression

        weight = weight_func(columns, tokens)

    # If the search expression only included "special" tokens like id:
    else:
        weight = literal_column(str(1))

    # Create an array of stringified detail columns
    details = getattr(cls, "__search_detail_columns__", None)
    if details:
        details = [cast(getattr(cls, d), Unicode) for d in details]
    else:
        details = [literal_column("NULL")]

    # Create a query object
    query = db.session.query(
        cls_name.label("model"),
        cls.id.label("id"),
        cls.name.label("name"),
        array(details).label("details"),
        weight.label("weight"),
    )

    # Filter out specific ids (optional)
    if ids:
        query = query.filter(cls.id.in_(ids))

    # Filter out results that don't match the patterns at all (optional)
    if not include_misses:
        query = query.filter(weight > 0)

    # Order by weight (optional)
    if ordered:
        query = query.order_by(desc(weight))

    return query
开发者ID:skylines-project,项目名称:skylines,代码行数:56,代码来源:search.py


示例16: get_next_bus

def get_next_bus(mc, db, stop_id):
    now_datetime = datetime.datetime.utcnow().replace(tzinfo=UTC)
    now_datetime = LONDON.normalize(now_datetime.astimezone(LONDON))
    now_day = now_datetime.weekday()
    now_time = now_datetime.time()

    mc_key = "V6:USERSTOP:" +  str(stop_id) + "USERTIME:" + now_datetime.strftime("%w%H%M")

    bus = mc.get(mc_key)
    if not bus:
        today_query = db.query(DepartureTimeDeref, literal_column("0").label("days_future")).\
                                      filter_by(bus_stop_id=stop_id).\
                                      filter(DepartureTimeDeref.time >= now_time).\
                                      filter(DepartureTimeDeref.valid_days.contains(cast([DAYS[now_day]], postgresql.ARRAY(String)))).\
                                      join(DepartureTimeDeref.timetable).\
                                      filter(Timetable.valid_from <= now_datetime.date()).\
                                      filter(Timetable.valid_to >= now_datetime.date())

        single_day_delta = datetime.timedelta(days=1)
        tomorrow_query = db.query(DepartureTimeDeref, literal_column("1").label("days_future")).\
                                      filter_by(bus_stop_id=stop_id).\
                                      filter(DepartureTimeDeref.valid_days.contains(cast([DAYS[now_day + 1]], postgresql.ARRAY(String)))).\
                                      join(DepartureTimeDeref.timetable).\
                                      filter(Timetable.valid_from <= now_datetime.date() + single_day_delta).\
                                      filter(Timetable.valid_to >= now_datetime.date() + single_day_delta)


        bus = today_query.union_all(tomorrow_query).\
                            options(joinedload(DepartureTimeDeref.timetable, Timetable.route)).\
                            order_by("days_future").\
                            order_by(DepartureTimeDeref.time).\
                            first()

        if bus:
            bus = {'departure': bus[0].to_JSON(), 'days_future': int(bus[1])}

        mc.set(mc_key, bus, 30*24*60*60)

    if bus:
        departure = bus['departure']
        # Create a day delta, is this departure time today or tomorrow?
        day_delta = datetime.timedelta(days=bus['days_future'])
        departure_day = now_datetime.date() + day_delta

        departure_dt = datetime.datetime.combine(departure_day, departure['time'])
        # Add timezone infomation. pytz will handle DST correctly
        departure_dt = LONDON.localize(departure_dt)

        departure['time'] = departure_dt

        bus = departure

    return bus
开发者ID:thomaspurchas,项目名称:Wheres-the-Damn-U1,代码行数:53,代码来源:bus.py


示例17: timeseries

    def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
        # Reading this blog post
        # http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
        # inspired this implementation.
        t = self.point_table

        # Special case for the 'quarter' unit of aggregation.
        step = '3 months' if agg_unit == 'quarter' else '1 ' + agg_unit

        # Create a CTE to represent every time bucket in the timeseries
        # with a default count of 0
        day_generator = func.generate_series(func.date_trunc(agg_unit, start),
                                             func.date_trunc(agg_unit, end),
                                             step)
        defaults = select([sa.literal_column("0").label('count'),
                           day_generator.label('time_bucket')])\
            .alias('defaults')

        where_filters = [t.c.point_date >= start, t.c.point_date <= end]
        if column_filters is not None:
            # Column filters has to be iterable here, because the '+' operator
            # behaves differently for SQLAlchemy conditions. Instead of
            # combining the conditions together, it would try to build
            # something like :param1 + <column_filters> as a new condition.
            where_filters += [column_filters]

        # Create a CTE that grabs the number of records contained in each time
        # bucket. Will only have rows for buckets with records.
        actuals = select([func.count(t.c.hash).label('count'),
                          func.date_trunc(agg_unit, t.c.point_date).
                         label('time_bucket')])\
            .where(sa.and_(*where_filters))\
            .group_by('time_bucket')

        # Also filter by geometry if requested
        if geom:
            contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
            actuals = actuals.where(contains)

        # Need to alias to make it usable in a subexpression
        actuals = actuals.alias('actuals')

        # Outer join the default and observed values
        # to create the timeseries select statement.
        # If no observed value in a bucket, use the default.
        name = sa.literal_column("'{}'".format(self.dataset_name))\
            .label('dataset_name')
        bucket = defaults.c.time_bucket.label('time_bucket')
        count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
        ts = select([name, bucket, count]).\
            select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))

        return ts
开发者ID:gitter-badger,项目名称:plenario,代码行数:53,代码来源:models.py


示例18: timeseries

    def timeseries(self, agg_unit, start, end, geom=None, column_filters=None):
        # Reading this blog post
        # http://no0p.github.io/postgresql/2014/05/08/timeseries-tips-pg.html
        # inspired this implementation.
        t = self.point_table

        if agg_unit == 'quarter':
            step = '3 months'
        else:
            step = '1 ' + agg_unit
        # Create a CTE to represent every time bucket in the timeseries
        # with a default count of 0
        day_generator = func.generate_series(func.date_trunc(agg_unit, start),
                                             func.date_trunc(agg_unit, end),
                                             step)
        defaults = select([sa.literal_column("0").label('count'),
                           day_generator.label('time_bucket')])\
            .alias('defaults')

        # Create a CTE that grabs the number of records
        # contained in each time bucket.
        # Will only have rows for buckets with records.
        where_filters = [t.c.point_date >= start,
                         t.c.point_date <= end]
        if column_filters:
            where_filters += column_filters

        actuals = select([func.count(t.c.hash).label('count'),
                          func.date_trunc(agg_unit, t.c.point_date).
                         label('time_bucket')])\
            .where(sa.and_(*where_filters))\
            .group_by('time_bucket')

        # Also filter by geometry if requested
        if geom:
            contains = func.ST_Within(t.c.geom, func.ST_GeomFromGeoJSON(geom))
            actuals = actuals.where(contains)

        # Need to alias to make it usable in a subexpression
        actuals = actuals.alias('actuals')

        # Outer join the default and observed values
        # to create the timeseries select statement.
        # If no observed value in a bucket, use the default.
        name = sa.literal_column("'{}'".format(self.dataset_name))\
            .label('dataset_name')
        bucket = defaults.c.time_bucket.label('time_bucket')
        count = func.coalesce(actuals.c.count, defaults.c.count).label('count')
        ts = select([name, bucket, count]).\
            select_from(defaults.outerjoin(actuals, actuals.c.time_bucket == defaults.c.time_bucket))

        return ts
开发者ID:carhart,项目名称:plenario,代码行数:52,代码来源:models.py


示例19: test_select_composition_six

 def test_select_composition_six(self):
     # test that "auto-labeling of subquery columns"
     # doesn't interfere with literal columns,
     # exported columns don't get quoted
     self.assert_compile(
         select([
             literal_column("column1 AS foobar"),
             literal_column("column2 AS hoho"), table1.c.myid],
             from_obj=[table1]).select(),
         "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
         "(SELECT column1 AS foobar, column2 AS hoho, "
         "mytable.myid AS myid FROM mytable)"
     )
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:13,代码来源:test_text.py


示例20: _update_current_rev

 def _update_current_rev(self, old, new):
     if old == new:  # pragma: no cover
         return
     if new is None:  # pragma: no cover
         self.impl._exec(Version.__table__.delete().where(package=self.pkg_name))
     elif old is None:
         self.impl._exec(
             Version.__table__.insert().values(package=self.pkg_name, version_num=sqla.literal_column("'%s'" % new))
         )
     else:
         self.impl._exec(
             Version.__table__.update().values(package=self.pkg_name, version_num=sqla.literal_column("'%s'" % new))
         )
开发者ID:webmaven,项目名称:ptah,代码行数:13,代码来源:migrate.py



注:本文中的sqlalchemy.literal_column函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python sqlalchemy.null函数代码示例发布时间:2022-05-27
下一篇:
Python sqlalchemy.join函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap