本文整理汇总了Python中storyboard.db.api.base.model_query函数的典型用法代码示例。如果您正苦于以下问题:Python model_query函数的具体用法?Python model_query怎么用?Python model_query使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了model_query函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: project_group_get_all
def project_group_get_all(marker=None, limit=None, offset=None,
subscriber_id=None, sort_field=None, sort_dir=None,
**kwargs):
# Sanity checks, in case someone accidentally explicitly passes in 'None'
if not sort_field:
sort_field = 'id'
if not sort_dir:
sort_dir = 'asc'
query = api_base.model_query(models.ProjectGroup)
query = api_base.apply_query_filters(query=query,
model=models.ProjectGroup,
**kwargs)
# Filter by subscriber ID
if subscriber_id is not None:
subs = api_base.model_query(models.Subscription)
subs = api_base.apply_query_filters(query=subs,
model=models.Subscription,
target_type='project_group',
user_id=subscriber_id)
subs = subs.subquery()
query = query.join(subs, subs.c.target_id == models.ProjectGroup.id)
query = api_base.paginate_query(query=query,
model=models.ProjectGroup,
limit=limit,
sort_key=sort_field,
marker=marker,
offset=offset,
sort_dir=sort_dir)
# Execute the query
return query.all()
开发者ID:openstack-infra,项目名称:storyboard,代码行数:34,代码来源:project_groups.py
示例2: __init__
def __init__(self, project_name):
"""Create a new instance of the launchpad-to-storyboard data writer.
"""
# username -> openid
self._openid_map = dict()
# openid -> SB User Entity
self._user_map = dict()
# tag_name -> SB StoryTag Entity
self._tag_map = dict()
# Build a session for the writer
self.session = db_api.get_session(in_request=False)
# SB Project Entity + Sanity check.
self.project = db_api.model_query(Project, self.session) \
.filter_by(name=project_name) \
.first()
if not self.project:
print("Local project %s not found in storyboard, please create \
it first." % (project_name))
sys.exit(1)
self.branch = db_api.model_query(Branch, self.session) \
.filter_by(project_id=self.project.id, name='master') \
.first()
if not self.branch:
print("No master branch found for project %s, please create \
one first." % (project_name))
sys.exit(1)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:29,代码来源:writer.py
示例3: story_get_count
def story_get_count(title=None, description=None, status=None,
assignee_id=None, creator_id=None, project_group_id=None,
project_id=None, subscriber_id=None, tags=None,
tags_filter_type="all"):
query = _story_build_query(title=title,
description=description,
assignee_id=assignee_id,
creator_id=creator_id,
project_group_id=project_group_id,
project_id=project_id,
tags=tags,
tags_filter_type=tags_filter_type)
# Filter by subscriber ID
if subscriber_id is not None:
subs = api_base.model_query(models.Subscription)
subs = api_base.apply_query_filters(query=subs,
model=models.Subscription,
target_type='story',
user_id=subscriber_id)
subs = subs.subquery()
query = query.join(subs, subs.c.target_id == models.Story.id)
# If we're also asking for status, we have to attach storysummary here,
# since story status is derived.
if status:
query = query.subquery()
summary_query = api_base.model_query(models.StorySummary)
summary_query = summary_query \
.join(query, models.StorySummary.id == query.c.id)
query = summary_query.filter(models.StorySummary.status.in_(status))
return query.count()
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:33,代码来源:stories.py
示例4: story_get_all
def story_get_all(title=None, description=None, status=None, assignee_id=None,
creator_id=None, project_group_id=None, project_id=None,
subscriber_id=None, tags=None, marker=None, offset=None,
limit=None, tags_filter_type="all", sort_field='id',
sort_dir='asc', current_user=None):
# Sanity checks, in case someone accidentally explicitly passes in 'None'
if not sort_field:
sort_field = 'id'
if not sort_dir:
sort_dir = 'asc'
if not isinstance(status, list) and status is not None:
status = [status]
# Build the query.
subquery = _story_build_query(title=title,
description=description,
assignee_id=assignee_id,
creator_id=creator_id,
project_group_id=project_group_id,
project_id=project_id,
tags=tags,
tags_filter_type=tags_filter_type,
current_user=current_user)
# Filter by subscriber ID
if subscriber_id is not None:
subs = api_base.model_query(models.Subscription)
subs = api_base.apply_query_filters(query=subs,
model=models.Subscription,
target_type='story',
user_id=subscriber_id)
subs = subs.subquery()
subquery = subquery.join(subs, subs.c.target_id == models.Story.id)
# Turn the whole shebang into a subquery.
subquery = subquery.subquery('filtered_stories')
# Return the story summary.
query = api_base.model_query(models.StorySummary)\
.options(subqueryload(models.StorySummary.tags))
query = query.join(subquery,
models.StorySummary.id == subquery.c.id)
if status:
query = query.filter(models.StorySummary.status.in_(status))
# paginate the query
query = api_base.paginate_query(query=query,
model=models.StorySummary,
limit=limit,
sort_key=sort_field,
marker=marker,
offset=offset,
sort_dir=sort_dir)
raw_stories = query.all()
return raw_stories
开发者ID:palvarez89,项目名称:storyboard,代码行数:58,代码来源:stories.py
示例5: filter_stories
def filter_stories(worklist, filters):
filter_queries = []
for filter in filters:
subquery = api_base.model_query(models.Story.id).distinct().subquery()
query = api_base.model_query(models.StorySummary)
query = query.join(subquery, models.StorySummary.id == subquery.c.id)
query = query.outerjoin(models.Task,
models.Project,
models.project_group_mapping,
models.ProjectGroup)
for criterion in filter.criteria:
attr = translate_criterion_to_field(criterion)
if hasattr(models.StorySummary, attr):
model = models.StorySummary
else:
if attr in ('assignee_id', 'project_id'):
model = models.Task
elif attr == 'project_group_id':
model = models.ProjectGroup
attr = 'id'
else:
continue
if attr == 'tags':
if criterion.negative:
query = query.filter(
~models.StorySummary.tags.any(
models.StoryTag.name.in_([criterion.value])))
else:
query = query.filter(
models.StorySummary.tags.any(
models.StoryTag.name.in_([criterion.value])))
continue
if criterion.negative:
query = query.filter(
getattr(model, attr) != criterion.value)
else:
query = query.filter(
getattr(model, attr) == criterion.value)
filter_queries.append(query)
if len(filter_queries) > 1:
query = filter_queries[0]
query = query.union(*filter_queries[1:])
return query.all()
elif len(filter_queries) == 1:
return filter_queries[0].all()
else:
return []
开发者ID:palvarez89,项目名称:storyboard,代码行数:50,代码来源:worklists.py
示例6: story_get
def story_get(story_id, session=None):
story_query = api_base.model_query(models.StorySummary, session)
story_summary = story_query\
.options(subqueryload(models.StorySummary.tags))\
.filter_by(id=story_id).first()
return story_summary
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:7,代码来源:stories.py
示例7: filter_tasks
def filter_tasks(worklist, filters):
filter_queries = []
for filter in filters:
query = api_base.model_query(models.Task)
query = query.outerjoin(models.Project,
models.project_group_mapping,
models.ProjectGroup)
for criterion in filter.criteria:
attr = translate_criterion_to_field(criterion)
if hasattr(models.Task, attr):
model = models.Task
elif attr == 'project_group_id':
model = models.ProjectGroup
attr = 'id'
else:
continue
if criterion.negative:
query = query.filter(getattr(model, attr) != criterion.value)
else:
query = query.filter(getattr(model, attr) == criterion.value)
filter_queries.append(query)
if len(filter_queries) > 1:
query = filter_queries[0]
query = query.union(*filter_queries[1:])
return query.all()
elif len(filter_queries) == 1:
return filter_queries[0].all()
else:
return []
开发者ID:palvarez89,项目名称:storyboard,代码行数:30,代码来源:worklists.py
示例8: run
def run(self):
"""Remove all oauth tokens that are more than a week old.
"""
# Calculate last week.
lastweek = datetime.now(pytz.utc) - timedelta(weeks=1)
LOG.debug("Removing Expired OAuth Tokens: %s" % (lastweek,))
# Build the session.
session = api_base.get_session(in_request=False,
autocommit=False,
expire_on_commit=True)
try:
query = api_base.model_query(AccessToken, session)
# Apply the filter.
query = query.filter(AccessToken.expires_at < lastweek)
# Manually deleting each record, because batch deletes are an
# exception to ORM Cascade markup.
for token in query.all():
session.delete(token)
session.commit()
except Exception:
session.rollback()
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:25,代码来源:cleaner.py
示例9: task_build_query
def task_build_query(project_group_id, current_user=None, **kwargs):
# Construct the query
query = api_base.model_query(models.Task)
if project_group_id:
query = query.join(models.Project,
models.project_group_mapping,
models.ProjectGroup) \
.filter(models.ProjectGroup.id == project_group_id)
# Sanity check on input parameters
query = api_base.apply_query_filters(query=query,
model=models.Task,
**kwargs)
# Filter out tasks or stories that the current user can't see
query = query.outerjoin(models.Story,
models.story_permissions,
models.Permission,
models.user_permissions,
models.User)
if current_user is not None:
query = query.filter(
or_(
and_(
models.User.id == current_user,
models.Story.private == true()
),
models.Story.private == false()
)
)
else:
query = query.filter(models.Story.private == false())
return query
开发者ID:palvarez89,项目名称:storyboard,代码行数:35,代码来源:tasks.py
示例10: project_get_all
def project_get_all(marker=None, offset=None, limit=None, sort_field=None,
sort_dir=None, project_group_id=None, subscriber_id=None,
**kwargs):
# Sanity checks, in case someone accidentally explicitly passes in 'None'
if not sort_field:
sort_field = 'id'
if not sort_dir:
sort_dir = 'asc'
# Construct the query
query = project_build_query(project_group_id=project_group_id,
**kwargs)
# Filter by subscriber ID
if subscriber_id is not None:
subs = api_base.model_query(models.Subscription)
subs = api_base.apply_query_filters(query=subs,
model=models.Subscription,
user_id=subscriber_id)
# Filter by exact match, to avoid matching "project_group"
subs = subs.filter(models.Subscription.target_type == 'project')
subs = subs.subquery()
query = query.join(subs, subs.c.target_id == models.Project.id)
query = api_base.paginate_query(query=query,
model=models.Project,
limit=limit,
sort_key=sort_field,
marker=marker,
offset=offset,
sort_dir=sort_dir)
# Execute the query
return query.all()
开发者ID:openstack-infra,项目名称:storyboard,代码行数:34,代码来源:projects.py
示例11: projects_query
def projects_query(self, q, sort_dir=None, marker=None, limit=None):
session = api_base.get_session()
query = api_base.model_query(models.Project, session)
query = self._build_fulltext_search(models.Project, query, q)
query = self._apply_pagination(models.Project, query, marker, limit)
return query.all()
开发者ID:devcurmudgeon,项目名称:storyboard,代码行数:7,代码来源:sqlalchemy_impl.py
示例12: users_query
def users_query(self, q, marker=None, offset=None, limit=None,
filter_non_public=False, **kwargs):
session = api_base.get_session()
clean_query = api_base.model_query(models.User, session)
try:
query = self._build_fulltext_search(models.User, clean_query, q)
query = self._apply_pagination(
models.User, query, marker, offset, limit)
users = query.all()
except DBError:
query = self._build_fulltext_search(models.User, clean_query, q,
mode=FullTextMode.NATURAL)
query = self._apply_pagination(
models.User, query, marker, offset, limit)
users = query.all()
if filter_non_public:
users = [
api_base._filter_non_public_fields(user, user._public_fields)
for user in users
]
return users
开发者ID:openstack-infra,项目名称:storyboard,代码行数:25,代码来源:sqlalchemy_impl.py
示例13: build_tag
def build_tag(self, tag_name):
"""Retrieve the SQLAlchemy record for the given tag name, creating it
if necessary.
:param tag_name: Name of the tag to retrieve and/or create.
:return: The SQLAlchemy entity corresponding to the tag name.
"""
if tag_name not in self._tag_map:
# Does it exist in the database?
tag = db_api.model_query(StoryTag, self.session) \
.filter_by(name=tag_name) \
.first()
if not tag:
# Go ahead and create it.
print("Importing tag '%s'" % tag_name)
tag = db_api.entity_create(StoryTag, {
'name': tag_name
}, session=self.session)
# Add it to our memory cache
self._tag_map[tag_name] = tag
return self._tag_map[tag_name]
开发者ID:openstack-infra,项目名称:storyboard,代码行数:25,代码来源:writer.py
示例14: tasks_query
def tasks_query(self, q, marker=None, offset=None, limit=None,
current_user=None, **kwargs):
session = api_base.get_session()
query = api_base.model_query(models.Task, session)
# Filter out tasks or stories that the current user can't see
query = query.outerjoin(models.Story,
models.story_permissions,
models.Permission,
models.user_permissions,
models.User)
if current_user is not None:
query = query.filter(
or_(
and_(
models.User.id == current_user,
models.Story.private == true()
),
models.Story.private == false()
)
)
else:
query = query.filter(models.Story.private == false())
query = self._build_fulltext_search(models.Task, query, q)
query = self._apply_pagination(
models.Task, query, marker, offset, limit)
return query.all()
开发者ID:palvarez89,项目名称:storyboard,代码行数:29,代码来源:sqlalchemy_impl.py
示例15: _story_fulltext_query
def _story_fulltext_query(self, query, q, status, marker, offset,
limit, mode, sort_field, sort_dir):
clean_query = self._build_fulltext_search(
models.Story, query, q, mode=mode)
# Turn the whole shebang into a subquery.
clean_query = clean_query.subquery('filtered_stories')
# Return the story summary.
query = api_base.model_query(models.StorySummary)\
.options(subqueryload(models.StorySummary.tags))
id_col = tuple(clean_query.c)[0]
query = query.join(clean_query,
models.StorySummary.id == id_col)
if status:
query = query.filter(models.StorySummary.status.in_(status))
query = self._apply_pagination(models.StorySummary,
query,
marker,
offset,
limit,
sort_field=sort_field,
sort_dir=sort_dir)
return query.all()
开发者ID:openstack-infra,项目名称:storyboard,代码行数:27,代码来源:sqlalchemy_impl.py
示例16: users_query
def users_query(self, q, marker=None, limit=None, **kwargs):
session = api_base.get_session()
query = api_base.model_query(models.User, session)
query = self._build_fulltext_search(models.User, query, q)
query = self._apply_pagination(models.User, query, marker, limit)
return query.all()
开发者ID:devcurmudgeon,项目名称:storyboard,代码行数:7,代码来源:sqlalchemy_impl.py
示例17: authorization_code_get
def authorization_code_get(code):
query = api_base.model_query(models.AuthorizationCode,
api_base.get_session())
# The query string parser always gives a list, but the database
# wants a single value.
if isinstance(code, list):
code = code[0]
return query.filter_by(code=code).first()
开发者ID:openstack-infra,项目名称:storyboard,代码行数:8,代码来源:auth_codes.py
示例18: check_branch
def check_branch(self, branch):
#Look in db for branches that are in project
#if branch is in project return True
exists = (db_api.model_query(Branch, self.session)
.filter_by(project_id=self.project.id)
.filter_by(name=branch).all())
return exists
开发者ID:openstack-infra,项目名称:storyboard,代码行数:8,代码来源:writer.py
示例19: user_token_get_count
def user_token_get_count(**kwargs):
query = api_base.model_query(models.AccessToken)
query = api_base.apply_query_filters(query=query,
model=models.AccessToken,
**kwargs)
return query.count()
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:8,代码来源:user_tokens.py
示例20: write_user
def write_user(self, lp_user):
"""Writes a launchpad user record into our user cache, resolving the
openid if necessary.
:param lp_user: The launchpad user record.
:return: The SQLAlchemy entity for the user record.
"""
try:
display_name = lp_user.display_name
user_link = lp_user.web_link
except errors.HTTPError:
display_name = "Disabled Launchpad User"
user_link = "000000000000000000000"
# Resolve the openid.
if user_link not in self._openid_map:
try:
openid_consumer = consumer.Consumer(
dict(id=cryptutil.randomString(16, '0123456789abcdef')),
None)
openid_request = openid_consumer.begin(user_link)
openid = openid_request.endpoint.getLocalID()
openid = openid.replace(
'login.launchpad.net', 'login.ubuntu.com')
self._openid_map[user_link] = openid
except DiscoveryFailure:
# If we encounter a launchpad maintenance user,
# give it an invalid openid.
print("WARNING: Invalid OpenID for user \'%s\'"
% (display_name,))
self._openid_map[user_link] = \
'http://example.com/invalid/~%s' % (display_name,)
openid = self._openid_map[user_link]
# Resolve the user record from the openid.
if openid not in self._user_map:
# Check for the user, create if new.
user = db_api.model_query(User, self.session) \
.filter_by(openid=openid) \
.first()
if not user:
print("Importing user '%s'" % (user_link))
# Use a temporary email address, since LP won't give this to
# us and it'll be updated on first login anyway.
user = db_api.entity_create(User, {
'openid': openid,
'full_name': display_name,
'email': "%s-%[email protected]" % (display_name, uuid.uuid4())
}, session=self.session)
self._user_map[openid] = user
return self._user_map[openid]
开发者ID:openstack-infra,项目名称:storyboard,代码行数:58,代码来源:writer.py
注:本文中的storyboard.db.api.base.model_query函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论