本文整理汇总了Python中sqlalchemy.orm.defaultload函数的典型用法代码示例。如果您正苦于以下问题:Python defaultload函数的具体用法?Python defaultload怎么用?Python defaultload使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了defaultload函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: serialize_timetable
def serialize_timetable(self, event, days=None, hide_weekends=False, hide_empty_days=False):
timetable = {}
for day in iterdays(event.start_dt_local, event.end_dt_local, skip_weekends=hide_weekends, day_whitelist=days):
date_str = day.strftime('%Y%m%d')
timetable[date_str] = {}
contributions_strategy = defaultload('contribution')
contributions_strategy.subqueryload('person_links')
contributions_strategy.subqueryload('references')
query_options = (contributions_strategy,
defaultload('session_block').subqueryload('person_links'))
query = (TimetableEntry.query.with_parent(event)
.options(*query_options)
.order_by(TimetableEntry.type != TimetableEntryType.SESSION_BLOCK))
for entry in query:
day = entry.start_dt.astimezone(event.tzinfo).date()
date_str = day.strftime('%Y%m%d')
if date_str not in timetable:
continue
if not entry.can_view(session.user):
continue
data = self.serialize_timetable_entry(entry, load_children=False)
key = self._get_entry_key(entry)
if entry.parent:
parent_code = 's{}'.format(entry.parent_id)
timetable[date_str][parent_code]['entries'][key] = data
else:
timetable[date_str][key] = data
if hide_empty_days:
timetable = self._filter_empty_days(timetable)
return timetable
开发者ID:belokop,项目名称:indico_bare,代码行数:30,代码来源:legacy.py
示例2: undefer_qtys
def undefer_qtys(entity):
"""Return options to undefer the qtys group on a related entity"""
if sqlalchemy.__version__ < "1.1.14":
return defaultload(entity)\
.undefer("used")\
.undefer("sold")\
.undefer("remaining")
return defaultload(entity).undefer_group("qtys")
开发者ID:sde1000,项目名称:quicktill,代码行数:8,代码来源:views.py
示例3: _clone_timetable
def _clone_timetable(self, new_event):
offset = new_event.start_dt - self.old_event.start_dt
# no need to copy the type; it's set automatically based on the object
attrs = get_simple_column_attrs(TimetableEntry) - {'type', 'start_dt'}
break_strategy = defaultload('break_')
break_strategy.joinedload('own_venue')
break_strategy.joinedload('own_room').lazyload('*')
query = (self.old_event.timetable_entries
.options(joinedload('parent').lazyload('*'),
break_strategy)
.order_by(TimetableEntry.parent_id.is_(None).desc()))
# iterate over all timetable entries; start with top-level
# ones so we can build a mapping that can be used once we
# reach nested entries
entry_map = {}
for old_entry in query:
entry = TimetableEntry()
entry.start_dt = old_entry.start_dt + offset
entry.populate_from_attrs(old_entry, attrs)
if old_entry.parent is not None:
entry.parent = entry_map[old_entry.parent]
if old_entry.session_block is not None:
entry.session_block = self._session_block_map[old_entry.session_block]
if old_entry.contribution is not None:
entry.contribution = self._contrib_map[old_entry.contribution]
if old_entry.break_ is not None:
entry.break_ = self._clone_break(old_entry.break_)
new_event.timetable_entries.append(entry)
entry_map[old_entry] = entry
开发者ID:indico,项目名称:indico,代码行数:29,代码来源:clone.py
示例4: test_unsafe_unbound_option_cancels_bake
def test_unsafe_unbound_option_cancels_bake(self):
User, Address, Dingaling = self._o2m_twolevel_fixture(lazy="joined")
class SubDingaling(Dingaling):
pass
mapper(SubDingaling, None, inherits=Dingaling)
lru = Address.dingalings.property._lazy_strategy._bakery(
lambda q: None
)._bakery
l1 = len(lru)
for i in range(5):
sess = Session()
u1 = (
sess.query(User)
.options(
defaultload(User.addresses).lazyload(
Address.dingalings.of_type(aliased(SubDingaling))
)
)
.first()
)
for ad in u1.addresses:
ad.dingalings
l2 = len(lru)
eq_(l1, 0)
eq_(l2, 1)
开发者ID:vrajmohan,项目名称:sqlalchemy,代码行数:28,代码来源:test_baked.py
示例5: test_load_only_path_specific
def test_load_only_path_specific(self):
User = self.classes.User
Address = self.classes.Address
Order = self.classes.Order
users = self.tables.users
addresses = self.tables.addresses
orders = self.tables.orders
mapper(User, users, properties=util.OrderedDict([
("addresses", relationship(Address, lazy="joined")),
("orders", relationship(Order, lazy="joined"))
]))
mapper(Address, addresses)
mapper(Order, orders)
sess = create_session()
q = sess.query(User).options(
load_only("name").defaultload("addresses").load_only("id", "email_address"),
defaultload("orders").load_only("id")
)
# hmmmm joinedload seems to be forcing users.id into here...
self.assert_compile(
q,
"SELECT users.id AS users_id, users.name AS users_name, "
"addresses_1.id AS addresses_1_id, "
"addresses_1.email_address AS addresses_1_email_address, "
"orders_1.id AS orders_1_id FROM users "
"LEFT OUTER JOIN addresses AS addresses_1 "
"ON users.id = addresses_1.user_id "
"LEFT OUTER JOIN orders AS orders_1 ON users.id = orders_1.user_id"
)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:35,代码来源:test_deferred.py
示例6: _test_load_only_propagate
def _test_load_only_propagate(self, use_load):
User = self.classes.User
Address = self.classes.Address
users = self.tables.users
addresses = self.tables.addresses
mapper(User, users, properties={
"addresses": relationship(Address)
})
mapper(Address, addresses)
sess = create_session()
expected = [
("SELECT users.id AS users_id, users.name AS users_name "
"FROM users WHERE users.id IN (:id_1, :id_2)", {'id_2': 8, 'id_1': 7}),
("SELECT addresses.id AS addresses_id, "
"addresses.email_address AS addresses_email_address "
"FROM addresses WHERE :param_1 = addresses.user_id", {'param_1': 7}),
("SELECT addresses.id AS addresses_id, "
"addresses.email_address AS addresses_email_address "
"FROM addresses WHERE :param_1 = addresses.user_id", {'param_1': 8}),
]
if use_load:
opt = Load(User).defaultload(User.addresses).load_only("id", "email_address")
else:
opt = defaultload(User.addresses).load_only("id", "email_address")
q = sess.query(User).options(opt).filter(User.id.in_([7, 8]))
def go():
for user in q:
user.addresses
self.sql_eq_(go, expected)
开发者ID:DeepakAlevoor,项目名称:sqlalchemy,代码行数:34,代码来源:test_deferred.py
示例7: go
def go():
for i in range(100):
q = sess.query(A).options(
joinedload(A.bs).joinedload(B.cs).joinedload(C.ds),
joinedload(A.es).joinedload(E.fs),
defaultload(A.es).joinedload(E.gs),
)
q._compile_context()
开发者ID:BY-jk,项目名称:sqlalchemy,代码行数:8,代码来源:test_orm.py
示例8: _checkParams
def _checkParams(self, params):
RHManageSurveysBase._checkParams(self, params)
survey_strategy = joinedload('survey')
answers_strategy = defaultload('answers').joinedload('question')
self.submission = (SurveySubmission
.find(id=request.view_args['submission_id'])
.options(answers_strategy, survey_strategy)
.one())
开发者ID:hennogous,项目名称:indico,代码行数:8,代码来源:results.py
示例9: _process_args
def _process_args(self):
RHManageSurveysBase._process_args(self)
survey_strategy = joinedload('survey')
answers_strategy = defaultload('answers').joinedload('question')
sections_strategy = joinedload('survey').defaultload('sections').joinedload('children')
self.submission = (SurveySubmission
.find(id=request.view_args['submission_id'])
.options(answers_strategy, survey_strategy, sections_strategy)
.one())
开发者ID:bkolobara,项目名称:indico,代码行数:9,代码来源:results.py
示例10: _checkParams
def _checkParams(self, params):
RHManageRegFormBase._checkParams(self, params)
self.registration = (Registration
.find(Registration.id == request.view_args['registration_id'],
~Registration.is_deleted,
~RegistrationForm.is_deleted)
.join(Registration.registration_form)
.options(contains_eager(Registration.registration_form)
.defaultload('form_items')
.joinedload('children'))
.options(defaultload(Registration.data)
.joinedload('field_data'))
.one())
开发者ID:MichelCordeiro,项目名称:indico,代码行数:13,代码来源:__init__.py
示例11: test_unbound_cache_key_undefer_group
def test_unbound_cache_key_undefer_group(self):
User, Address = self.classes('User', 'Address')
query_path = self._make_path_registry([User, "addresses"])
opt = defaultload(User.addresses).undefer_group('xyz')
eq_(
opt._generate_cache_key(query_path),
(
(Address, 'column:*', ("undefer_group_xyz", True)),
)
)
开发者ID:zhsj,项目名称:sqlalchemy,代码行数:14,代码来源:test_options.py
示例12: serialize_timetable
def serialize_timetable(self, days=None, hide_weekends=False, strip_empty_days=False):
tzinfo = self.event.tzinfo if self.management else self.event.display_tzinfo
self.event.preload_all_acl_entries()
timetable = {}
for day in iterdays(self.event.start_dt.astimezone(tzinfo), self.event.end_dt.astimezone(tzinfo),
skip_weekends=hide_weekends, day_whitelist=days):
date_str = day.strftime('%Y%m%d')
timetable[date_str] = {}
contributions_strategy = defaultload('contribution')
contributions_strategy.subqueryload('person_links')
contributions_strategy.subqueryload('references')
query_options = (contributions_strategy,
defaultload('session_block').subqueryload('person_links'))
query = (TimetableEntry.query.with_parent(self.event)
.options(*query_options)
.order_by(TimetableEntry.type != TimetableEntryType.SESSION_BLOCK))
for entry in query:
day = entry.start_dt.astimezone(tzinfo).date()
date_str = day.strftime('%Y%m%d')
if date_str not in timetable:
continue
if not entry.can_view(self.user):
continue
data = self.serialize_timetable_entry(entry, load_children=False)
key = self._get_entry_key(entry)
if entry.parent:
parent_code = 's{}'.format(entry.parent_id)
timetable[date_str][parent_code]['entries'][key] = data
else:
if (entry.type == TimetableEntryType.SESSION_BLOCK and
entry.start_dt.astimezone(tzinfo).date() != entry.end_dt.astimezone(tzinfo).date()):
# If a session block lasts into another day we need to add it to that day, too
timetable[entry.end_dt.astimezone(tzinfo).date().strftime('%Y%m%d')][key] = data
timetable[date_str][key] = data
if strip_empty_days:
timetable = self._strip_empty_days(timetable)
return timetable
开发者ID:bkolobara,项目名称:indico,代码行数:37,代码来源:legacy.py
示例13: test_safe_unbound_option_allows_bake
def test_safe_unbound_option_allows_bake(self):
User, Address, Dingaling = self._o2m_twolevel_fixture(lazy="joined")
lru = Address.dingalings.property._lazy_strategy._bakery(
lambda q: None)._bakery
l1 = len(lru)
for i in range(5):
sess = Session()
u1 = sess.query(User).options(
defaultload(User.addresses).lazyload(
Address.dingalings)).first()
for ad in u1.addresses:
ad.dingalings
l2 = len(lru)
eq_(l1, 0)
eq_(l2, 2)
开发者ID:gencer,项目名称:sqlalchemy,代码行数:16,代码来源:test_baked.py
示例14: test_unbound_cache_key_included_safe_w_option
def test_unbound_cache_key_included_safe_w_option(self):
User, Address, Order, Item, SubItem = self.classes(
'User', 'Address', 'Order', 'Item', 'SubItem')
opt = defaultload("orders").joinedload(
"items", innerjoin=True).defer("description")
query_path = self._make_path_registry([User, "orders"])
eq_(
opt._generate_cache_key(query_path),
(
(Order, 'items', Item,
('lazy', 'joined'), ('innerjoin', True)),
(Order, 'items', Item, 'description',
('deferred', True), ('instrument', True))
)
)
开发者ID:biroc,项目名称:sqlalchemy,代码行数:17,代码来源:test_options.py
示例15: __init__
def __init__(self, occurrences, start_dt, end_dt, candidates=None, rooms=None, specific_room=None,
repeat_frequency=None, repeat_interval=None, flexible_days=0, show_blockings=True):
self.occurrences = occurrences
self.start_dt = start_dt
self.end_dt = end_dt
self.candidates = candidates
self.rooms = rooms
self.specific_room = specific_room
self.repeat_frequency = repeat_frequency
self.repeat_interval = repeat_interval
self.flexible_days = flexible_days
self.show_blockings = show_blockings
self.conflicts = 0
self.bars = []
if self.specific_room and self.rooms:
raise ValueError('specific_room and rooms are mutually exclusive')
if self.specific_room:
self.rooms = [self.specific_room]
elif self.rooms is None:
self.rooms = Room.find_all(is_active=True)
self.rooms = sorted(self.rooms, key=lambda x: natural_sort_key(x.full_name))
if self.show_blockings:
# avoid loading user data we don't care about
user_strategy = defaultload('blocking').defaultload('created_by_user')
user_strategy.noload('*')
user_strategy.load_only('first_name', 'last_name')
room_ids = [r.id for r in self.rooms]
filters = {
'room_ids': room_ids,
'state': BlockedRoom.State.accepted,
'start_date': self.start_dt.date(),
'end_date': self.end_dt.date()
}
self.blocked_rooms = BlockedRoom.find_with_filters(filters).options(user_strategy)
self.nonbookable_periods = NonBookablePeriod.find(
NonBookablePeriod.room_id.in_(room_ids),
NonBookablePeriod.overlaps(self.start_dt, self.end_dt)
).all()
else:
self.blocked_rooms = []
self._produce_bars()
开发者ID:DirkHoffmann,项目名称:indico,代码行数:46,代码来源:calendar.py
示例16: __init__
def __init__(self, model, query, _as_relation=None):
""" Init a MongoDB-style query
:param model: MongoModel
:type model: mongosql.MongoModel
:param query: Query to work with
:type query: sqlalchemy.orm.Query
:param _as_relation: Parent relationship.
Internal argument used when working with deeper relations:
is used as initial path for defaultload(_as_relation).lazyload(...).
:type _as_relation: sqlalchemy.orm.relationships.RelationshipProperty
"""
assert isinstance(model, MongoModel)
assert isinstance(query, Query)
self._model = model
self._query = query
self._as_relation = defaultload(_as_relation) if _as_relation else Load(self._model.model)
self._no_joindefaults = False
开发者ID:RussellLuo,项目名称:py-mongosql,代码行数:19,代码来源:query.py
示例17: test_fetch_results
def test_fetch_results(self):
A, B, C, D, E, F, G = self.classes('A', 'B', 'C', 'D', 'E', 'F', 'G')
sess = Session()
q = sess.query(A).options(
joinedload(A.bs).joinedload(B.cs).joinedload(C.ds),
joinedload(A.es).joinedload(E.fs),
defaultload(A.es).joinedload(E.gs),
)
context = q._compile_context()
@profiling.function_call_count()
def go():
for i in range(100):
obj = q._execute_and_instances(context)
list(obj)
sess.close()
go()
开发者ID:anti-social,项目名称:sqlalchemy,代码行数:20,代码来源:test_orm.py
示例18: test_unbound_cache_key_included_safe_w_loadonly_strs
def test_unbound_cache_key_included_safe_w_loadonly_strs(self):
User, Address, Order, Item, SubItem = self.classes(
'User', 'Address', 'Order', 'Item', 'SubItem')
query_path = self._make_path_registry([User, "addresses"])
opt = defaultload(User.addresses).load_only("id", "email_address")
eq_(
opt._generate_cache_key(query_path),
(
(Address, 'id',
('deferred', False), ('instrument', True)),
(Address, 'email_address',
('deferred', False), ('instrument', True)),
(Address, 'column:*',
('deferred', True), ('instrument', True),
('undefer_pks', True))
)
)
开发者ID:biroc,项目名称:sqlalchemy,代码行数:20,代码来源:test_options.py
示例19: test_undefer_group_from_relationship_lazyload
def test_undefer_group_from_relationship_lazyload(self):
users, Order, User, orders = \
(self.tables.users,
self.classes.Order,
self.classes.User,
self.tables.orders)
mapper(User, users, properties=dict(
orders=relationship(Order, order_by=orders.c.id)))
mapper(
Order, orders, properties=util.OrderedDict([
('userident', deferred(orders.c.user_id, group='primary')),
('description', deferred(orders.c.description,
group='primary')),
('opened', deferred(orders.c.isopen, group='primary'))
])
)
sess = create_session()
q = sess.query(User).filter(User.id == 7).options(
defaultload(User.orders).undefer_group('primary')
)
def go():
result = q.all()
o2 = result[0].orders[1]
eq_(o2.opened, 1)
eq_(o2.userident, 7)
eq_(o2.description, 'order 3')
self.sql_eq_(go, [
("SELECT users.id AS users_id, users.name AS users_name "
"FROM users WHERE users.id = :id_1", {"id_1": 7}),
("SELECT orders.user_id AS orders_user_id, orders.description "
"AS orders_description, orders.isopen AS orders_isopen, "
"orders.id AS orders_id, orders.address_id AS orders_address_id "
"FROM orders WHERE :param_1 = orders.user_id ORDER BY orders.id",
{'param_1': 7})])
开发者ID:cpcloud,项目名称:sqlalchemy,代码行数:37,代码来源:test_deferred.py
示例20: _process_args
def _process_args(self):
self.regform = (RegistrationForm.query
.filter_by(id=request.view_args['reg_form_id'], is_deleted=False)
.options(defaultload('form_items').joinedload('children').joinedload('current_data'))
.one())
开发者ID:bkolobara,项目名称:indico,代码行数:5,代码来源:__init__.py
注:本文中的sqlalchemy.orm.defaultload函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论