• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python engine.Connection类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中sqlalchemy.engine.Connection的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_free_title

def get_free_title(connection: Connection,
                   title: str,
                   auth_user_id: str) -> str:
    """
    Get a good version of the title to be inserted into the survey table. If
    the title as given already exists, this function will append a number.
    For example, when the title is "survey":
    1. "survey" not in table -> "survey"
    2. "survey" in table     -> "survey(1)"
    3. "survey(1)" in table  -> "survey(2)"

    :param connection: a SQLAlchemy Connection
    :param title: the survey title
    :param auth_user_id: the user's UUID
    :return: a title that can be inserted safely
    """

    (does_exist, ), = connection.execute(
        select((exists().where(
            survey_table.c.survey_title == title
        ).where(survey_table.c.auth_user_id == auth_user_id),)))
    if not does_exist:
        return title
    similar_surveys = connection.execute(
        select([survey_table]).where(
            survey_table.c.survey_title.like(title + '%')
        ).where(
            survey_table.c.auth_user_id == auth_user_id
        )
    ).fetchall()
    conflicts = list(_conflicting(title, similar_surveys))
    free_number = max(conflicts) + 1 if len(conflicts) else 1
    return title + '({})'.format(free_number)
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:33,代码来源:survey.py


示例2: delete

def delete(connection: Connection, survey_id: str):
    """
    Delete the survey specified by the given survey_id

    :param connection: a SQLAlchemy connection
    :param survey_id: the UUID of the survey
    """
    with connection.begin():
        connection.execute(delete_record(survey_table, 'survey_id', survey_id))
    return json_response('Survey deleted')
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:10,代码来源:survey.py


示例3: _create_choices

def _create_choices(connection: Connection,
                    values: dict,
                    question_id: str,
                    submission_map: dict,
                    existing_question_id: str=None) -> Iterator:
    """
    Create the choices of a survey question. If this is an update to an
    existing survey, it will also copy over answers to the questions.

    :param connection: the SQLAlchemy Connection object for the transaction
    :param values: the dictionary of values associated with the question
    :param question_id: the UUID of the question
    :param submission_map: a dictionary mapping old submission_id to new
    :param existing_question_id: the UUID of the existing question (if this is
                                 an update)
    :return: an iterable of the resultant choice fields
    """
    choices = values['choices']
    new_choices, updates = _determine_choices(connection, existing_question_id,
                                              choices)

    for number, choice in enumerate(new_choices):
        choice_dict = {
            'question_id': question_id,
            'survey_id': values['survey_id'],
            'choice': choice,
            'choice_number': number,
            'type_constraint_name': values['type_constraint_name'],
            'question_sequence_number': values['sequence_number'],
            'allow_multiple': values['allow_multiple']}
        executable = question_choice_insert(**choice_dict)
        exc = [('unique_choice_names', RepeatedChoiceError(choice))]
        result = execute_with_exceptions(connection, executable, exc)
        result_ipk = result.inserted_primary_key
        question_choice_id = result_ipk[0]

        if choice in updates:
            question_fields = {'question_id': question_id,
                               'type_constraint_name': result_ipk[2],
                               'sequence_number': result_ipk[3],
                               'allow_multiple': result_ipk[4],
                               'survey_id': values['survey_id']}
            for answer in get_answer_choices_for_choice_id(connection,
                                                           updates[choice]):
                answer_values = question_fields.copy()
                new_submission_id = submission_map[answer.submission_id]
                answer_values['question_choice_id'] = question_choice_id
                answer_values['submission_id'] = new_submission_id
                answer_metadata = answer.answer_choice_metadata
                answer_values['answer_choice_metadata'] = answer_metadata
                connection.execute(answer_choice_insert(**answer_values))

        yield question_choice_id
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:53,代码来源:survey.py


示例4: insert_profile

 def insert_profile(conn: Connection, insert: str, p: Profile):
     u, _ = unify_profile_name(p.first_name, p.last_name)
     b64u = generate_id(u)
     conn.execute(
         insert,
         (
             sanitize_text(p.identifier),
             b64u,
             sanitize_text(p.first_name),
             sanitize_text(p.last_name),
             sanitize_text(p.display_name),
             sanitize_text(p.link),
         ),
     )
开发者ID:ankoh,项目名称:mc-server,代码行数:14,代码来源:crawl_data.py


示例5: init_db

def init_db(connection: Connection, force: bool=False, test: bool=False) -> None:
    import c2cgeoportal_commons.models.main  # noqa: F401
    import c2cgeoportal_commons.models.static  # noqa: F401
    from c2cgeoportal_commons.models import schema

    schema_static = '{}_static'.format(schema)

    assert schema is not None
    if force:
        if schema_exists(connection, schema):
            connection.execute('DROP SCHEMA {} CASCADE;'.format(schema))
        if schema_exists(connection, schema_static):
            connection.execute('DROP SCHEMA {} CASCADE;'.format(schema_static))

    if not schema_exists(connection, schema):
        connection.execute('CREATE SCHEMA "{}";'.format(schema))

    if not schema_exists(connection, schema_static):
        connection.execute('CREATE SCHEMA "{}";'.format(schema_static))

    Base.metadata.create_all(connection)

    session_factory = get_session_factory(connection)

    with transaction.manager:
        dbsession = get_tm_session(session_factory, transaction.manager)
        if test:
            setup_test_data(dbsession)
开发者ID:yjacolin,项目名称:c2cgeoportal,代码行数:28,代码来源:initializedb.py


示例6: lock_table

def lock_table(connection: Connection, target_table: Table):
    """
    Lock a table using a PostgreSQL advisory lock

    The OID of the table in the pg_class relation is used as lock id.
    :param connection: DB connection
    :param target_table: Table object
    """
    logger.debug('Locking table "%s"', target_table.name)
    oid = connection.execute(select([column("oid")])
                             .select_from(table("pg_class"))
                             .where((column("relname") == target_table.name))
                             ).scalar()
    connection.execute(select([func.pg_advisory_xact_lock(oid)])).scalar()
开发者ID:agdsn,项目名称:hades,代码行数:14,代码来源:db.py


示例7: survey_select

def survey_select(connection: Connection,
                  survey_id: str,
                  auth_user_id: str=None,
                  email: str=None) -> RowProxy:
    """
    Get a record from the survey table. You must supply either the
    auth_user_id or the email.

    :param connection: a SQLAlchemy Connection
    :param survey_id: the UUID of the survey
    :param auth_user_id: the UUID of the user
    :param email: the user's e-mail address
    :return: the corresponding record
    :raise SurveyDoesNotExistError: if the UUID is not in the table
    """
    table = survey_table
    conds = [survey_table.c.survey_id == survey_id]

    if auth_user_id is not None:
        if email is not None:
            raise TypeError('You cannot specify both auth_user_id and email')
        conds.append(survey_table.c.auth_user_id == auth_user_id)
    elif email is not None:
        table = table.join(auth_user_table)
        conds.append(auth_user_table.c.email == email)
    else:
        raise TypeError('You must specify either auth_user_id or email')

    survey = connection.execute(select([survey_table]).select_from(
        table).where(and_(*conds))).first()
    if survey is None:
        raise SurveyDoesNotExistError(survey_id)
    return survey
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:33,代码来源:survey.py


示例8: get_questions

def get_questions(connection: Connection,
                  survey_id: str,
                  auth_user_id: [str, None]=None,
                  email: [str, None]=None) -> ResultProxy:
    """
    Get all the questions for a survey identified by survey_id ordered by
    sequence number restricted by auth_user.

    :param connection: a SQLAlchemy Connection
    :param survey_id: the UUID of the survey
    :param auth_user_id: the UUID of the user
    :param email: the user's e-mail address
    :return: an iterable of the questions (RowProxy)
    """

    table = question_table.join(survey_table)
    conds = [question_table.c.survey_id == survey_id]

    if auth_user_id is not None:
        if email is not None:
            raise TypeError('You cannot specify both auth_user_id and email')
        conds.append(survey_table.c.auth_user_id == auth_user_id)
    elif email is not None:
        table = table.join(auth_user_table)
        conds.append(auth_user_table.c.email == email)
    else:
        raise TypeError('You must specify either auth_user_id or email')

    questions = connection.execute(
        select([question_table]).select_from(table).where(
            and_(*conds)).order_by('sequence_number asc'))
    return questions
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:32,代码来源:question.py


示例9: get_auth_attempts_at_port

def get_auth_attempts_at_port(connection: Connection,
                              nas_ip_address: netaddr.IPAddress,
                              nas_port_id: str,
                              when: Optional[DatetimeRange]=None,
                              limit: Optional[int]=None)-> Iterable[
        Tuple[str, str, Groups, Attributes, datetime]]:
    """
    Return auth attempts at a particular port of an NAS ordered by Auth-Date
    descending.

    :param connection: A SQLAlchemy connection
    :param nas_ip_address: NAS IP address
    :param nas_port_id: NAS Port ID
    :param when: Range in which Auth-Date must be within
    :param limit: Maximum number of records
    :return: An iterable that yields (User-Name, Packet-Type, Groups, Reply,
             Auth-Date)-tuples ordered by Auth-Date descending
    """
    logger.debug('Getting all auth attempts at port %2$s of %1$s',
                 nas_ip_address, nas_port_id)
    query = (
        select([radpostauth.c.UserName, radpostauth.c.PacketType,
                radpostauth.c.Groups, radpostauth.c.Reply,
                radpostauth.c.AuthDate])
        .where(and_(radpostauth.c.NASIPAddress == nas_ip_address,
                    radpostauth.c.NASPortId == nas_port_id))
        .order_by(radpostauth.c.AuthDate.desc())
    )
    if when is not None:
        query.where(radpostauth.c.AuthDate.op('<@') <= func.tstzrange(*when))
    if limit is not None:
        query = query.limit(limit)
    return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:33,代码来源:db.py


示例10: _return_sql

def _return_sql(connection: Connection,
                result: object,
                survey_id: str,
                auth_user_id: str,
                question_id: str) -> object:
    """
    Get the result for a _scalar-y function.

    :param connection: a SQLAlchemy Connection
    :param result: the result of the SQL function
    :param survey_id: the UUID of the survey
    :param auth_user_id: the UUID of the user
    :param question_id: the UUID of the question
    :return: the result of the SQL function
    :raise NoSubmissionsToQuestionError: if there are no submissions
    :raise QuestionDoesNotExistError: if the user is not authorized
    """
    if result is None or result == []:
        condition = survey_table.c.survey_id == survey_id
        stmt = select([survey_table]).where(condition)
        proper_id = connection.execute(stmt).first().auth_user_id
        if auth_user_id == proper_id:
            raise NoSubmissionsToQuestionError(question_id)
        raise QuestionDoesNotExistError(question_id)
    return result
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:25,代码来源:aggregation.py


示例11: execute_with_exceptions

def execute_with_exceptions(connection: Connection,
                            executable: [Insert, Update],
                            exceptions: Iterator) -> ResultProxy:
    """
    Execute the given executable (a SQLAlchemy Insert or Update) within a
    transaction (provided by the Connection object), and raise meaningful
    exceptions. Normally connection.execute() will raise a generic Integrity
    error, so use the exceptions parameter to specify which exceptions to
    raise instead.

    :param connection: the SQLAlchemy connection (for transaction purposes)
    :param executable: the object to pass to connection.execute()
    :param exceptions: an iterable of (name: str, exception: Exception) tuples.
                       name is the string to look for in the IntegrityError,
                       and exception is the Exception to raise instead of
                       IntegrityError
    :return: a SQLAlchemy ResultProxy
    """
    try:
        return connection.execute(executable)
    except IntegrityError as exc:
        error = str(exc.orig)
        for name, exception in exceptions:
            if name in error:
                raise exception
        raise
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:26,代码来源:__init__.py


示例12: get_stats

def get_stats(connection: Connection,
              survey_id: str,
              email: str) -> dict:
    """
    Get statistics about the specified survey: creation time, number of
    submissions, time of the earliest submission, and time of the latest
    submission.

    :param connection: a SQLAlchemy Connection
    :param survey_id: the UUID of the survey
    :param email: the e-mail address of the user
    :return: a JSON representation of the statistics.
    """
    result = connection.execute(
        select([
            survey_table.c.created_on,
            count(submission_table.c.submission_id),
            sqlmin(submission_table.c.submission_time),
            sqlmax(submission_table.c.submission_time)
        ]).select_from(
            auth_user_table.join(survey_table).outerjoin(submission_table)
        ).where(
            survey_table.c.survey_id == survey_id
        ).where(
            auth_user_table.c.email == email
        ).group_by(
            survey_table.c.survey_id
        )
    ).first()
    return json_response({
        'created_on': maybe_isoformat(result[0]),
        'num_submissions': result[1],
        'earliest_submission_time': maybe_isoformat(result[2]),
        'latest_submission_time': maybe_isoformat(result[3])
    })
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:35,代码来源:survey.py


示例13: get_sessions_of_mac

def get_sessions_of_mac(connection: Connection, mac: netaddr.EUI,
                        when: Optional[DatetimeRange]=None,
                        limit: Optional[int]=None) -> Iterable[
        Tuple[netaddr.IPAddress, str, datetime, datetime]]:
    """
    Return accounting sessions of a particular MAC address ordered by
    Session-Start-Time descending.

    :param connection: A SQLAlchemy connection
    :param str mac: MAC address
    :param when: Range in which Session-Start-Time must be within
    :param limit: Maximum number of records
    :return: An iterable that yields (NAS-IP-Address, NAS-Port-Id,
    Session-Start-Time, Session-Stop-Time)-tuples ordered by Session-Start-Time
    descending
    """
    logger.debug('Getting all sessions for MAC "%s"', mac)
    query = (
        select([radacct.c.NASIPAddress, radacct.c.NASPortId,
                radacct.c.AcctStartTime,
                radacct.c.AcctStopTime])
        .where(and_(radacct.c.UserName == mac))
        .order_by(radacct.c.AcctStartTime.desc())
    )
    if when is not None:
        query.where(radacct.c.AcctStartTime.op('<@') <= func.tstzrange(*when))
    if limit is not None:
        query = query.limit(limit)
    return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:29,代码来源:db.py


示例14: update

def update(connection: Connection, data: dict):
    """
    Update a survey (title, questions). You can also add or modify questions
    here. Note that this creates a new survey (with new submissions, etc),
    copying everything from the old survey. The old survey's title will be
    changed to end with "(new version created on <time>)".

    :param connection: a SQLAlchemy Connection
    :param data: JSON containing the UUID of the survey and fields to update.
    """
    survey_id = data['survey_id']
    email = data['email']
    existing_survey = survey_select(connection, survey_id, email=email)
    if 'survey_metadata' not in data:
        data['survey_metadata'] = existing_survey.survey_metadata
    update_time = datetime.datetime.now()

    with connection.begin():
        new_title = '{} (new version created on {})'.format(
            existing_survey.survey_title, update_time.isoformat())
        executable = update_record(survey_table, 'survey_id', survey_id,
                                   survey_title=new_title)
        exc = [('survey_title_survey_owner_key',
                SurveyAlreadyExistsError(new_title))]
        execute_with_exceptions(connection, executable, exc)

        new_survey_id = _create_survey(connection, data)

    return get_one(connection, new_survey_id, email=email)
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:29,代码来源:survey.py


示例15: _jsonify

def _jsonify(connection: Connection,
             answer: object,
             question_id: str) -> object:
    """
    This function returns a "nice" representation of an answer which can be
    serialized as JSON.

    :param connection: a SQLAlchemy Connection
    :param answer: a submitted value
    :param type_constraint_name: the UUID of the question
    :return: the nice representation
    """
    type_constraint_name = question_select(connection,
                                           question_id).type_constraint_name
    if type_constraint_name in {'location', 'facility'}:
        geo_json = connection.execute(func.ST_AsGeoJSON(answer)).scalar()
        return json_decode(geo_json)['coordinates']
    elif type_constraint_name in {'date', 'time'}:
        return maybe_isoformat(answer)
    elif type_constraint_name == 'decimal':
        return float(answer)
    elif type_constraint_name == 'multiple_choice':
        question_choice = question_choice_select(connection, answer)
        return question_choice.choice
    else:
        return answer
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:26,代码来源:aggregation.py


示例16: get_auth_attempts_of_mac

def get_auth_attempts_of_mac(connection: Connection, mac: netaddr.EUI,
                             when: Optional[DatetimeRange]=None,
                             limit: Optional[int]=None) -> Iterable[
        Tuple[netaddr.IPAddress, str, str, Groups, Attributes, datetime]]:
    """
    Return auth attempts of a particular MAC address order by Auth-Date
    descending.

    :param connection: A SQLAlchemy connection
    :param mac: MAC address
    :param when: Range in which Auth-Date must be within
    :param limit: Maximum number of records
    :return: An iterable that yields (NAS-IP-Address, NAS-Port-Id, Packet-Type,
    Groups, Reply, Auth-Date)-tuples ordered by Auth-Date descending
    """
    logger.debug('Getting all auth attempts of MAC %s', mac)
    query = (
        select([radpostauth.c.NASIPAddress, radpostauth.c.NASPortId,
                radpostauth.c.PacketType, radpostauth.c.Groups,
                radpostauth.c.Reply, radpostauth.c.AuthDate])
        .where(and_(radpostauth.c.UserName == mac))
        .order_by(radpostauth.c.AuthDate.desc())
    )
    if when is not None:
        query.where(radpostauth.c.AuthDate.op('<@') <= func.tstzrange(*when))
    if limit is not None:
        query = query.limit(limit)
    return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:28,代码来源:db.py


示例17: _copy_submission_entries

def _copy_submission_entries(connection: Connection,
                             existing_survey_id: str,
                             new_survey_id: str,
                             email: str) -> tuple:
    """
    Copy submissions from an existing survey to its updated copy.

    :param connection: the SQLAlchemy connection used for the transaction
    :param existing_survey_id: the UUID of the existing survey
    :param new_survey_id: the UUID of the survey's updated copy
    :param email: the user's e-mail address
    :return: a tuple containing the old and new submission IDs
    """
    submissions = get_submissions_by_email(
        connection, email,
        survey_id=existing_survey_id
    )
    for sub in submissions:
        values = {'submitter': sub.submitter,
                  'submitter_email': sub.submitter_email,
                  'submission_time': sub.submission_time,
                  'save_time': sub.save_time,
                  'survey_id': new_survey_id}
        result = connection.execute(submission_insert(**values))
        yield sub.submission_id, result.inserted_primary_key[0]
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:25,代码来源:survey.py


示例18: create_user

def create_user(connection: Connection, data: dict) -> dict:
    """
    Registers a new user account.

    :param connection: a SQLAlchemy Connection
    :param data: the user's e-mail
    :return: a response containing the e-mail and whether it was created or
    already exists in the database
    """
    email = data['email']
    try:
        get_auth_user_by_email(connection, email)
    except UserDoesNotExistError:
        with connection.begin():
            connection.execute(create_auth_user(email=email))
        return json_response({'email': email, 'response': 'Created'})
    return json_response({'email': email, 'response': 'Already exists'})
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:17,代码来源:user.py


示例19: refresh_and_diff_materialized_view

def refresh_and_diff_materialized_view(
        connection: Connection, view: Table, copy: Table,
        result_columns: Iterable[Column]) -> Tuple[
        List[Tuple], List[Tuple], List[Tuple]]:
    with connection.begin():
        lock_table(connection, view)
        create_temp_copy(connection, view, copy)
        refresh_materialized_view(connection, view)
        return diff_tables(connection, view, copy, result_columns)
开发者ID:agdsn,项目名称:hades,代码行数:9,代码来源:db.py


示例20: bar_graph

def bar_graph(connection: Connection,
              question_id: str,
              auth_user_id: str=None,
              email: str=None,
              limit: [int, None]=None,
              count_order: bool=False) -> dict:
    """
    Get a list of the number of times each submission value appears. You must
    provide either an auth_user_id or e-mail address.

    :param connection: a SQLAlchemy Connection
    :param question_id: the UUID of the question
    :param auth_user_id: the UUID of the user
    :param email: the e-mail address of the user.
    :param limit: a limit on the number of results
    :param count_order: whether to order from largest count to smallest
    :return: a JSON dict containing the result [[values], [counts]]
    """
    user_id = _get_user_id(connection, auth_user_id, email)

    allowable_types = {'text', 'integer', 'decimal', 'multiple_choice', 'date',
                       'time', 'location', 'facility'}

    question = question_select(connection, question_id)

    tcn = _get_type_constraint_name(allowable_types, question)

    # Assume that you only want to consider the non-other answers
    original_table, column_name = _table_and_column(tcn)
    table = original_table.join(
        question_table,
        original_table.c.question_id == question_table.c.question_id
    ).join(survey_table)

    conds = [question_table.c.question_id == question_id,
             survey_table.c.auth_user_id == user_id]
    column = get_column(original_table, column_name)

    column_query = select(
        [column, sqlcount(column)]
    ).select_from(table).group_by(column)
    ordering = desc(sqlcount(column)) if count_order else column
    ordered_query = column_query.order_by(ordering)

    result = connection.execute(
        ordered_query.where(and_(*conds)).limit(limit)
    )

    result = _return_sql(connection, result, question.survey_id, user_id,
                         question_id)
    bar_graph_result = [[_jsonify(connection, r[0], question_id), r[1]] for r
                        in result]
    response = json_response(
        _return_sql(connection, bar_graph_result, question.survey_id,
                    user_id, question_id))
    response['query'] = 'bar_graph'
    return response
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:57,代码来源:aggregation.py



注:本文中的sqlalchemy.engine.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python reflection.Inspector类代码示例发布时间:2022-05-27
下一篇:
Python engine.create_engine函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap