本文整理汇总了Python中sqlalchemy.engine.Connection类的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_free_title
def get_free_title(connection: Connection,
title: str,
auth_user_id: str) -> str:
"""
Get a good version of the title to be inserted into the survey table. If
the title as given already exists, this function will append a number.
For example, when the title is "survey":
1. "survey" not in table -> "survey"
2. "survey" in table -> "survey(1)"
3. "survey(1)" in table -> "survey(2)"
:param connection: a SQLAlchemy Connection
:param title: the survey title
:param auth_user_id: the user's UUID
:return: a title that can be inserted safely
"""
(does_exist, ), = connection.execute(
select((exists().where(
survey_table.c.survey_title == title
).where(survey_table.c.auth_user_id == auth_user_id),)))
if not does_exist:
return title
similar_surveys = connection.execute(
select([survey_table]).where(
survey_table.c.survey_title.like(title + '%')
).where(
survey_table.c.auth_user_id == auth_user_id
)
).fetchall()
conflicts = list(_conflicting(title, similar_surveys))
free_number = max(conflicts) + 1 if len(conflicts) else 1
return title + '({})'.format(free_number)
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:33,代码来源:survey.py
示例2: delete
def delete(connection: Connection, survey_id: str):
"""
Delete the survey specified by the given survey_id
:param connection: a SQLAlchemy connection
:param survey_id: the UUID of the survey
"""
with connection.begin():
connection.execute(delete_record(survey_table, 'survey_id', survey_id))
return json_response('Survey deleted')
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:10,代码来源:survey.py
示例3: _create_choices
def _create_choices(connection: Connection,
values: dict,
question_id: str,
submission_map: dict,
existing_question_id: str=None) -> Iterator:
"""
Create the choices of a survey question. If this is an update to an
existing survey, it will also copy over answers to the questions.
:param connection: the SQLAlchemy Connection object for the transaction
:param values: the dictionary of values associated with the question
:param question_id: the UUID of the question
:param submission_map: a dictionary mapping old submission_id to new
:param existing_question_id: the UUID of the existing question (if this is
an update)
:return: an iterable of the resultant choice fields
"""
choices = values['choices']
new_choices, updates = _determine_choices(connection, existing_question_id,
choices)
for number, choice in enumerate(new_choices):
choice_dict = {
'question_id': question_id,
'survey_id': values['survey_id'],
'choice': choice,
'choice_number': number,
'type_constraint_name': values['type_constraint_name'],
'question_sequence_number': values['sequence_number'],
'allow_multiple': values['allow_multiple']}
executable = question_choice_insert(**choice_dict)
exc = [('unique_choice_names', RepeatedChoiceError(choice))]
result = execute_with_exceptions(connection, executable, exc)
result_ipk = result.inserted_primary_key
question_choice_id = result_ipk[0]
if choice in updates:
question_fields = {'question_id': question_id,
'type_constraint_name': result_ipk[2],
'sequence_number': result_ipk[3],
'allow_multiple': result_ipk[4],
'survey_id': values['survey_id']}
for answer in get_answer_choices_for_choice_id(connection,
updates[choice]):
answer_values = question_fields.copy()
new_submission_id = submission_map[answer.submission_id]
answer_values['question_choice_id'] = question_choice_id
answer_values['submission_id'] = new_submission_id
answer_metadata = answer.answer_choice_metadata
answer_values['answer_choice_metadata'] = answer_metadata
connection.execute(answer_choice_insert(**answer_values))
yield question_choice_id
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:53,代码来源:survey.py
示例4: insert_profile
def insert_profile(conn: Connection, insert: str, p: Profile):
u, _ = unify_profile_name(p.first_name, p.last_name)
b64u = generate_id(u)
conn.execute(
insert,
(
sanitize_text(p.identifier),
b64u,
sanitize_text(p.first_name),
sanitize_text(p.last_name),
sanitize_text(p.display_name),
sanitize_text(p.link),
),
)
开发者ID:ankoh,项目名称:mc-server,代码行数:14,代码来源:crawl_data.py
示例5: init_db
def init_db(connection: Connection, force: bool=False, test: bool=False) -> None:
import c2cgeoportal_commons.models.main # noqa: F401
import c2cgeoportal_commons.models.static # noqa: F401
from c2cgeoportal_commons.models import schema
schema_static = '{}_static'.format(schema)
assert schema is not None
if force:
if schema_exists(connection, schema):
connection.execute('DROP SCHEMA {} CASCADE;'.format(schema))
if schema_exists(connection, schema_static):
connection.execute('DROP SCHEMA {} CASCADE;'.format(schema_static))
if not schema_exists(connection, schema):
connection.execute('CREATE SCHEMA "{}";'.format(schema))
if not schema_exists(connection, schema_static):
connection.execute('CREATE SCHEMA "{}";'.format(schema_static))
Base.metadata.create_all(connection)
session_factory = get_session_factory(connection)
with transaction.manager:
dbsession = get_tm_session(session_factory, transaction.manager)
if test:
setup_test_data(dbsession)
开发者ID:yjacolin,项目名称:c2cgeoportal,代码行数:28,代码来源:initializedb.py
示例6: lock_table
def lock_table(connection: Connection, target_table: Table):
"""
Lock a table using a PostgreSQL advisory lock
The OID of the table in the pg_class relation is used as lock id.
:param connection: DB connection
:param target_table: Table object
"""
logger.debug('Locking table "%s"', target_table.name)
oid = connection.execute(select([column("oid")])
.select_from(table("pg_class"))
.where((column("relname") == target_table.name))
).scalar()
connection.execute(select([func.pg_advisory_xact_lock(oid)])).scalar()
开发者ID:agdsn,项目名称:hades,代码行数:14,代码来源:db.py
示例7: survey_select
def survey_select(connection: Connection,
survey_id: str,
auth_user_id: str=None,
email: str=None) -> RowProxy:
"""
Get a record from the survey table. You must supply either the
auth_user_id or the email.
:param connection: a SQLAlchemy Connection
:param survey_id: the UUID of the survey
:param auth_user_id: the UUID of the user
:param email: the user's e-mail address
:return: the corresponding record
:raise SurveyDoesNotExistError: if the UUID is not in the table
"""
table = survey_table
conds = [survey_table.c.survey_id == survey_id]
if auth_user_id is not None:
if email is not None:
raise TypeError('You cannot specify both auth_user_id and email')
conds.append(survey_table.c.auth_user_id == auth_user_id)
elif email is not None:
table = table.join(auth_user_table)
conds.append(auth_user_table.c.email == email)
else:
raise TypeError('You must specify either auth_user_id or email')
survey = connection.execute(select([survey_table]).select_from(
table).where(and_(*conds))).first()
if survey is None:
raise SurveyDoesNotExistError(survey_id)
return survey
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:33,代码来源:survey.py
示例8: get_questions
def get_questions(connection: Connection,
survey_id: str,
auth_user_id: [str, None]=None,
email: [str, None]=None) -> ResultProxy:
"""
Get all the questions for a survey identified by survey_id ordered by
sequence number restricted by auth_user.
:param connection: a SQLAlchemy Connection
:param survey_id: the UUID of the survey
:param auth_user_id: the UUID of the user
:param email: the user's e-mail address
:return: an iterable of the questions (RowProxy)
"""
table = question_table.join(survey_table)
conds = [question_table.c.survey_id == survey_id]
if auth_user_id is not None:
if email is not None:
raise TypeError('You cannot specify both auth_user_id and email')
conds.append(survey_table.c.auth_user_id == auth_user_id)
elif email is not None:
table = table.join(auth_user_table)
conds.append(auth_user_table.c.email == email)
else:
raise TypeError('You must specify either auth_user_id or email')
questions = connection.execute(
select([question_table]).select_from(table).where(
and_(*conds)).order_by('sequence_number asc'))
return questions
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:32,代码来源:question.py
示例9: get_auth_attempts_at_port
def get_auth_attempts_at_port(connection: Connection,
nas_ip_address: netaddr.IPAddress,
nas_port_id: str,
when: Optional[DatetimeRange]=None,
limit: Optional[int]=None)-> Iterable[
Tuple[str, str, Groups, Attributes, datetime]]:
"""
Return auth attempts at a particular port of an NAS ordered by Auth-Date
descending.
:param connection: A SQLAlchemy connection
:param nas_ip_address: NAS IP address
:param nas_port_id: NAS Port ID
:param when: Range in which Auth-Date must be within
:param limit: Maximum number of records
:return: An iterable that yields (User-Name, Packet-Type, Groups, Reply,
Auth-Date)-tuples ordered by Auth-Date descending
"""
logger.debug('Getting all auth attempts at port %2$s of %1$s',
nas_ip_address, nas_port_id)
query = (
select([radpostauth.c.UserName, radpostauth.c.PacketType,
radpostauth.c.Groups, radpostauth.c.Reply,
radpostauth.c.AuthDate])
.where(and_(radpostauth.c.NASIPAddress == nas_ip_address,
radpostauth.c.NASPortId == nas_port_id))
.order_by(radpostauth.c.AuthDate.desc())
)
if when is not None:
query.where(radpostauth.c.AuthDate.op('<@') <= func.tstzrange(*when))
if limit is not None:
query = query.limit(limit)
return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:33,代码来源:db.py
示例10: _return_sql
def _return_sql(connection: Connection,
result: object,
survey_id: str,
auth_user_id: str,
question_id: str) -> object:
"""
Get the result for a _scalar-y function.
:param connection: a SQLAlchemy Connection
:param result: the result of the SQL function
:param survey_id: the UUID of the survey
:param auth_user_id: the UUID of the user
:param question_id: the UUID of the question
:return: the result of the SQL function
:raise NoSubmissionsToQuestionError: if there are no submissions
:raise QuestionDoesNotExistError: if the user is not authorized
"""
if result is None or result == []:
condition = survey_table.c.survey_id == survey_id
stmt = select([survey_table]).where(condition)
proper_id = connection.execute(stmt).first().auth_user_id
if auth_user_id == proper_id:
raise NoSubmissionsToQuestionError(question_id)
raise QuestionDoesNotExistError(question_id)
return result
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:25,代码来源:aggregation.py
示例11: execute_with_exceptions
def execute_with_exceptions(connection: Connection,
executable: [Insert, Update],
exceptions: Iterator) -> ResultProxy:
"""
Execute the given executable (a SQLAlchemy Insert or Update) within a
transaction (provided by the Connection object), and raise meaningful
exceptions. Normally connection.execute() will raise a generic Integrity
error, so use the exceptions parameter to specify which exceptions to
raise instead.
:param connection: the SQLAlchemy connection (for transaction purposes)
:param executable: the object to pass to connection.execute()
:param exceptions: an iterable of (name: str, exception: Exception) tuples.
name is the string to look for in the IntegrityError,
and exception is the Exception to raise instead of
IntegrityError
:return: a SQLAlchemy ResultProxy
"""
try:
return connection.execute(executable)
except IntegrityError as exc:
error = str(exc.orig)
for name, exception in exceptions:
if name in error:
raise exception
raise
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:26,代码来源:__init__.py
示例12: get_stats
def get_stats(connection: Connection,
survey_id: str,
email: str) -> dict:
"""
Get statistics about the specified survey: creation time, number of
submissions, time of the earliest submission, and time of the latest
submission.
:param connection: a SQLAlchemy Connection
:param survey_id: the UUID of the survey
:param email: the e-mail address of the user
:return: a JSON representation of the statistics.
"""
result = connection.execute(
select([
survey_table.c.created_on,
count(submission_table.c.submission_id),
sqlmin(submission_table.c.submission_time),
sqlmax(submission_table.c.submission_time)
]).select_from(
auth_user_table.join(survey_table).outerjoin(submission_table)
).where(
survey_table.c.survey_id == survey_id
).where(
auth_user_table.c.email == email
).group_by(
survey_table.c.survey_id
)
).first()
return json_response({
'created_on': maybe_isoformat(result[0]),
'num_submissions': result[1],
'earliest_submission_time': maybe_isoformat(result[2]),
'latest_submission_time': maybe_isoformat(result[3])
})
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:35,代码来源:survey.py
示例13: get_sessions_of_mac
def get_sessions_of_mac(connection: Connection, mac: netaddr.EUI,
when: Optional[DatetimeRange]=None,
limit: Optional[int]=None) -> Iterable[
Tuple[netaddr.IPAddress, str, datetime, datetime]]:
"""
Return accounting sessions of a particular MAC address ordered by
Session-Start-Time descending.
:param connection: A SQLAlchemy connection
:param str mac: MAC address
:param when: Range in which Session-Start-Time must be within
:param limit: Maximum number of records
:return: An iterable that yields (NAS-IP-Address, NAS-Port-Id,
Session-Start-Time, Session-Stop-Time)-tuples ordered by Session-Start-Time
descending
"""
logger.debug('Getting all sessions for MAC "%s"', mac)
query = (
select([radacct.c.NASIPAddress, radacct.c.NASPortId,
radacct.c.AcctStartTime,
radacct.c.AcctStopTime])
.where(and_(radacct.c.UserName == mac))
.order_by(radacct.c.AcctStartTime.desc())
)
if when is not None:
query.where(radacct.c.AcctStartTime.op('<@') <= func.tstzrange(*when))
if limit is not None:
query = query.limit(limit)
return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:29,代码来源:db.py
示例14: update
def update(connection: Connection, data: dict):
"""
Update a survey (title, questions). You can also add or modify questions
here. Note that this creates a new survey (with new submissions, etc),
copying everything from the old survey. The old survey's title will be
changed to end with "(new version created on <time>)".
:param connection: a SQLAlchemy Connection
:param data: JSON containing the UUID of the survey and fields to update.
"""
survey_id = data['survey_id']
email = data['email']
existing_survey = survey_select(connection, survey_id, email=email)
if 'survey_metadata' not in data:
data['survey_metadata'] = existing_survey.survey_metadata
update_time = datetime.datetime.now()
with connection.begin():
new_title = '{} (new version created on {})'.format(
existing_survey.survey_title, update_time.isoformat())
executable = update_record(survey_table, 'survey_id', survey_id,
survey_title=new_title)
exc = [('survey_title_survey_owner_key',
SurveyAlreadyExistsError(new_title))]
execute_with_exceptions(connection, executable, exc)
new_survey_id = _create_survey(connection, data)
return get_one(connection, new_survey_id, email=email)
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:29,代码来源:survey.py
示例15: _jsonify
def _jsonify(connection: Connection,
answer: object,
question_id: str) -> object:
"""
This function returns a "nice" representation of an answer which can be
serialized as JSON.
:param connection: a SQLAlchemy Connection
:param answer: a submitted value
:param type_constraint_name: the UUID of the question
:return: the nice representation
"""
type_constraint_name = question_select(connection,
question_id).type_constraint_name
if type_constraint_name in {'location', 'facility'}:
geo_json = connection.execute(func.ST_AsGeoJSON(answer)).scalar()
return json_decode(geo_json)['coordinates']
elif type_constraint_name in {'date', 'time'}:
return maybe_isoformat(answer)
elif type_constraint_name == 'decimal':
return float(answer)
elif type_constraint_name == 'multiple_choice':
question_choice = question_choice_select(connection, answer)
return question_choice.choice
else:
return answer
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:26,代码来源:aggregation.py
示例16: get_auth_attempts_of_mac
def get_auth_attempts_of_mac(connection: Connection, mac: netaddr.EUI,
when: Optional[DatetimeRange]=None,
limit: Optional[int]=None) -> Iterable[
Tuple[netaddr.IPAddress, str, str, Groups, Attributes, datetime]]:
"""
Return auth attempts of a particular MAC address order by Auth-Date
descending.
:param connection: A SQLAlchemy connection
:param mac: MAC address
:param when: Range in which Auth-Date must be within
:param limit: Maximum number of records
:return: An iterable that yields (NAS-IP-Address, NAS-Port-Id, Packet-Type,
Groups, Reply, Auth-Date)-tuples ordered by Auth-Date descending
"""
logger.debug('Getting all auth attempts of MAC %s', mac)
query = (
select([radpostauth.c.NASIPAddress, radpostauth.c.NASPortId,
radpostauth.c.PacketType, radpostauth.c.Groups,
radpostauth.c.Reply, radpostauth.c.AuthDate])
.where(and_(radpostauth.c.UserName == mac))
.order_by(radpostauth.c.AuthDate.desc())
)
if when is not None:
query.where(radpostauth.c.AuthDate.op('<@') <= func.tstzrange(*when))
if limit is not None:
query = query.limit(limit)
return iter(connection.execute(query))
开发者ID:agdsn,项目名称:hades,代码行数:28,代码来源:db.py
示例17: _copy_submission_entries
def _copy_submission_entries(connection: Connection,
existing_survey_id: str,
new_survey_id: str,
email: str) -> tuple:
"""
Copy submissions from an existing survey to its updated copy.
:param connection: the SQLAlchemy connection used for the transaction
:param existing_survey_id: the UUID of the existing survey
:param new_survey_id: the UUID of the survey's updated copy
:param email: the user's e-mail address
:return: a tuple containing the old and new submission IDs
"""
submissions = get_submissions_by_email(
connection, email,
survey_id=existing_survey_id
)
for sub in submissions:
values = {'submitter': sub.submitter,
'submitter_email': sub.submitter_email,
'submission_time': sub.submission_time,
'save_time': sub.save_time,
'survey_id': new_survey_id}
result = connection.execute(submission_insert(**values))
yield sub.submission_id, result.inserted_primary_key[0]
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:25,代码来源:survey.py
示例18: create_user
def create_user(connection: Connection, data: dict) -> dict:
"""
Registers a new user account.
:param connection: a SQLAlchemy Connection
:param data: the user's e-mail
:return: a response containing the e-mail and whether it was created or
already exists in the database
"""
email = data['email']
try:
get_auth_user_by_email(connection, email)
except UserDoesNotExistError:
with connection.begin():
connection.execute(create_auth_user(email=email))
return json_response({'email': email, 'response': 'Created'})
return json_response({'email': email, 'response': 'Already exists'})
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:17,代码来源:user.py
示例19: refresh_and_diff_materialized_view
def refresh_and_diff_materialized_view(
connection: Connection, view: Table, copy: Table,
result_columns: Iterable[Column]) -> Tuple[
List[Tuple], List[Tuple], List[Tuple]]:
with connection.begin():
lock_table(connection, view)
create_temp_copy(connection, view, copy)
refresh_materialized_view(connection, view)
return diff_tables(connection, view, copy, result_columns)
开发者ID:agdsn,项目名称:hades,代码行数:9,代码来源:db.py
示例20: bar_graph
def bar_graph(connection: Connection,
question_id: str,
auth_user_id: str=None,
email: str=None,
limit: [int, None]=None,
count_order: bool=False) -> dict:
"""
Get a list of the number of times each submission value appears. You must
provide either an auth_user_id or e-mail address.
:param connection: a SQLAlchemy Connection
:param question_id: the UUID of the question
:param auth_user_id: the UUID of the user
:param email: the e-mail address of the user.
:param limit: a limit on the number of results
:param count_order: whether to order from largest count to smallest
:return: a JSON dict containing the result [[values], [counts]]
"""
user_id = _get_user_id(connection, auth_user_id, email)
allowable_types = {'text', 'integer', 'decimal', 'multiple_choice', 'date',
'time', 'location', 'facility'}
question = question_select(connection, question_id)
tcn = _get_type_constraint_name(allowable_types, question)
# Assume that you only want to consider the non-other answers
original_table, column_name = _table_and_column(tcn)
table = original_table.join(
question_table,
original_table.c.question_id == question_table.c.question_id
).join(survey_table)
conds = [question_table.c.question_id == question_id,
survey_table.c.auth_user_id == user_id]
column = get_column(original_table, column_name)
column_query = select(
[column, sqlcount(column)]
).select_from(table).group_by(column)
ordering = desc(sqlcount(column)) if count_order else column
ordered_query = column_query.order_by(ordering)
result = connection.execute(
ordered_query.where(and_(*conds)).limit(limit)
)
result = _return_sql(connection, result, question.survey_id, user_id,
question_id)
bar_graph_result = [[_jsonify(connection, r[0], question_id), r[1]] for r
in result]
response = json_response(
_return_sql(connection, bar_graph_result, question.survey_id,
user_id, question_id))
response['query'] = 'bar_graph'
return response
开发者ID:juniorsilver,项目名称:dokomoforms,代码行数:57,代码来源:aggregation.py
注:本文中的sqlalchemy.engine.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论