本文整理汇总了Python中storyboard.db.api.base.get_session函数的典型用法代码示例。如果您正苦于以下问题:Python get_session函数的具体用法?Python get_session怎么用?Python get_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_cli
def test_cli(self):
with mock.patch.object(sys, 'argv', self.argv):
cli.main()
session = db_api_base.get_session()
project_groups = session.query(models.ProjectGroup).all()
projects = session.query(models.Project).all()
self.assertIsNotNone(project_groups)
self.assertIsNotNone(projects)
# Loaded + mock_data
project_names = ["Test-Project", "Test-Project-Two",
"project1", "project2", "tests/project3"]
project_ids = []
for project in projects:
self.assertIn(project.name, project_names)
project_ids.append(project.id)
project_names.remove(project.name)
# call again and nothing should change
cli.main()
session = db_api_base.get_session()
projects = session.query(models.Project).all()
self.assertIsNotNone(projects)
for project in projects:
self.assertIn(project.id, project_ids)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:28,代码来源:test_load_projects.py
示例2: run
def run(self):
"""Remove all oauth tokens that are more than a week old.
"""
# Calculate last week.
lastweek = datetime.now(pytz.utc) - timedelta(weeks=1)
LOG.debug("Removing Expired OAuth Tokens: %s" % (lastweek,))
# Build the session.
session = api_base.get_session(in_request=False,
autocommit=False,
expire_on_commit=True)
try:
query = api_base.model_query(AccessToken, session)
# Apply the filter.
query = query.filter(AccessToken.expires_at < lastweek)
# Manually deleting each record, because batch deletes are an
# exception to ORM Cascade markup.
for token in query.all():
session.delete(token)
session.commit()
except Exception:
session.rollback()
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:25,代码来源:cleaner.py
示例3: tasks_query
def tasks_query(self, q, marker=None, offset=None, limit=None,
current_user=None, **kwargs):
session = api_base.get_session()
query = api_base.model_query(models.Task, session)
# Filter out tasks or stories that the current user can't see
query = query.outerjoin(models.Story,
models.story_permissions,
models.Permission,
models.user_permissions,
models.User)
if current_user is not None:
query = query.filter(
or_(
and_(
models.User.id == current_user,
models.Story.private == true()
),
models.Story.private == false()
)
)
else:
query = query.filter(models.Story.private == false())
query = self._build_fulltext_search(models.Task, query, q)
query = self._apply_pagination(
models.Task, query, marker, offset, limit)
return query.all()
开发者ID:palvarez89,项目名称:storyboard,代码行数:29,代码来源:sqlalchemy_impl.py
示例4: downgrade
def downgrade(active_plugins=None, options=None):
op.add_column(u'worklists', sa.Column('permission_id',
mysql.INTEGER(display_width=11), autoincrement=False,
nullable=True))
op.create_foreign_key(u'worklists_ibfk_2', 'worklists', 'permissions',
['permission_id'], ['id'])
op.add_column(u'boards', sa.Column('permission_id',
mysql.INTEGER(display_width=11), autoincrement=False,
nullable=True))
op.create_foreign_key(u'boards_ibfk_2', 'boards', 'permissions',
['permission_id'], ['id'])
session = api_base.get_session(in_request=False)
to_delete = []
for board in boards.get_all(session=session):
for permission in board.permissions:
to_delete.append(permission)
for worklist in worklists.get_all(session=session):
for permission in worklist.permissions:
to_delete.append(permission)
op.drop_table('worklist_permissions')
op.drop_table('board_permissions')
for permission in to_delete:
api_base.entity_hard_delete(
models.Permission, permission.id, session=session)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:28,代码来源:050_add_detailed_permissions_to_boards_and_.py
示例5: project_group_delete_project
def project_group_delete_project(project_group_id, project_id):
session = api_base.get_session()
with session.begin(subtransactions=True):
project_group = _entity_get(project_group_id, session)
if project_group is None:
raise exc.NotFound(_("%(name)s %(id)s not found")
% {'name': "Project Group",
'id': project_group_id})
project = projects.project_get(project_id)
if project is None:
raise exc.NotFound(_("%(name)s %(id)s not found")
% {'name': "Project",
'id': project_id})
if project_id not in [p.id for p in project_group.projects]:
raise ClientSideError(_("The Project %(id)d is not in "
"Project Group %(group_id)d") %
{'id': project_id,
'group_id': project_group_id})
project_entry = [p for p in project_group.projects
if p.id == project_id][0]
project_group.projects.remove(project_entry)
session.add(project_group)
return project_group
开发者ID:openstack-infra,项目名称:storyboard,代码行数:28,代码来源:project_groups.py
示例6: test_handle
def test_handle(self):
"""Assert that the handle method passes the correct values on."""
worker_base = MockEmailWorkerBase({})
with base.HybridSessionManager():
session = db_api_base.get_session()
user_1 = db_api_base.entity_get(models.User, 1, session=session)
worker_base.handle(session=session,
author=user_1,
method='POST',
url='http://localhost/',
path='/test',
query_string='',
status=201,
resource='story',
resource_id=1)
self.assertIsInstance(worker_base.handled_values['author'],
models.User)
self.assertEqual(1, worker_base.handled_values['author'].id)
self.assertEqual(2, len(worker_base.handled_values['subscribers']))
self.assertEqual('POST', worker_base.handled_values['method'])
self.assertEqual(201, worker_base.handled_values['status'])
self.assertEqual('/test', worker_base.handled_values['path'])
self.assertEqual('story', worker_base.handled_values['resource'])
self.assertEqual(1, worker_base.handled_values['resource_id'])
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:28,代码来源:test_workers.py
示例7: event
def event(self, author_id, method, url, path, query_string, status,
resource, resource_id, sub_resource=None, sub_resource_id=None,
resource_before=None, resource_after=None):
"""Handle an event.
A database session is created, and passed to the abstract method.
"""
session = db_api.get_session(in_request=False)
with session.begin(subtransactions=True):
author = self.resolve_resource_by_name(session, 'user', author_id)
self.handle(session=session,
author=author,
method=method,
url=url,
path=path,
query_string=query_string,
status=status,
resource=resource,
resource_id=resource_id,
sub_resource=sub_resource,
sub_resource_id=sub_resource_id,
resource_before=resource_before,
resource_after=resource_after)
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:25,代码来源:event_worker.py
示例8: users_query
def users_query(self, q, marker=None, offset=None, limit=None,
filter_non_public=False, **kwargs):
session = api_base.get_session()
clean_query = api_base.model_query(models.User, session)
try:
query = self._build_fulltext_search(models.User, clean_query, q)
query = self._apply_pagination(
models.User, query, marker, offset, limit)
users = query.all()
except DBError:
query = self._build_fulltext_search(models.User, clean_query, q,
mode=FullTextMode.NATURAL)
query = self._apply_pagination(
models.User, query, marker, offset, limit)
users = query.all()
if filter_non_public:
users = [
api_base._filter_non_public_fields(user, user._public_fields)
for user in users
]
return users
开发者ID:openstack-infra,项目名称:storyboard,代码行数:25,代码来源:sqlalchemy_impl.py
示例9: _entity_get_query
def _entity_get_query(session=None):
if not session:
session = api_base.get_session()
query = session.query(models.ProjectGroup)\
.options(subqueryload(models.ProjectGroup.projects))
return query
开发者ID:openstack-infra,项目名称:storyboard,代码行数:7,代码来源:project_groups.py
示例10: projects_query
def projects_query(self, q, sort_dir=None, marker=None, limit=None):
session = api_base.get_session()
query = api_base.model_query(models.Project, session)
query = self._build_fulltext_search(models.Project, query, q)
query = self._apply_pagination(models.Project, query, marker, limit)
return query.all()
开发者ID:devcurmudgeon,项目名称:storyboard,代码行数:7,代码来源:sqlalchemy_impl.py
示例11: users_query
def users_query(self, q, marker=None, limit=None, **kwargs):
session = api_base.get_session()
query = api_base.model_query(models.User, session)
query = self._build_fulltext_search(models.User, query, q)
query = self._apply_pagination(models.User, query, marker, limit)
return query.all()
开发者ID:devcurmudgeon,项目名称:storyboard,代码行数:7,代码来源:sqlalchemy_impl.py
示例12: main
def main():
CONF.register_cli_opts(IMPORT_OPTS)
try:
log.register_options(CONF)
except cfg.ArgsAlreadyParsedError:
pass
log.setup(CONF, 'storyboard')
CONF(project='storyboard')
# only_tags and exclude_tags are mutually exclusive
if CONF.only_tags and CONF.exclude_tags:
print('ERROR: only-tags and exclude-tags are mutually exclusive',
file=sys.stderr)
exit(1)
# If the user requested an autoincrement value, set that before we start
# importing things. Note that mysql will automatically set the
# autoincrement to the next-available id equal to or larger than the
# requested one.
auto_increment = CONF.auto_increment
if auto_increment:
print('Setting stories.AUTO_INCREMENT to %d' % (auto_increment,))
session = db_api.get_session(in_request=False)
session.execute('ALTER TABLE stories AUTO_INCREMENT = %d;'
% (auto_increment,))
if CONF.origin is 'launchpad':
loader = LaunchpadLoader(CONF.from_project, CONF.to_project,
set(CONF.only_tags), set(CONF.exclude_tags))
loader.run()
else:
print('Unsupported import origin: %s' % CONF.origin)
return
开发者ID:openstack-infra,项目名称:storyboard,代码行数:33,代码来源:cli.py
示例13: __init__
def __init__(self, project_name):
"""Create a new instance of the launchpad-to-storyboard data writer.
"""
# username -> openid
self._openid_map = dict()
# openid -> SB User Entity
self._user_map = dict()
# tag_name -> SB StoryTag Entity
self._tag_map = dict()
# Build a session for the writer
self.session = db_api.get_session(in_request=False)
# SB Project Entity + Sanity check.
self.project = db_api.model_query(Project, self.session) \
.filter_by(name=project_name) \
.first()
if not self.project:
print("Local project %s not found in storyboard, please create \
it first." % (project_name))
sys.exit(1)
self.branch = db_api.model_query(Branch, self.session) \
.filter_by(project_id=self.project.id, name='master') \
.first()
if not self.branch:
print("No master branch found for project %s, please create \
one first." % (project_name))
sys.exit(1)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:29,代码来源:writer.py
示例14: _worklist_get
def _worklist_get(id, session=None):
if not session:
session = api_base.get_session()
query = session.query(models.Worklist).options(
subqueryload(models.Worklist.items)).filter_by(id=id)
return query.first()
开发者ID:ColdrickSotK,项目名称:storyboard,代码行数:7,代码来源:worklists.py
示例15: story_add_tag
def story_add_tag(story_id, tag_name, current_user=None):
session = api_base.get_session()
with session.begin(subtransactions=True):
# Get a tag or create a new one
tag = story_tags.tag_get_by_name(tag_name, session=session)
if not tag:
tag = story_tags.tag_create({"name": tag_name})
story = story_get_simple(
story_id, session=session, current_user=current_user)
if not story:
raise exc.NotFound(_("%(name)s %(id)s not found") %
{'name': "Story", 'id': story_id})
if tag_name in [t.name for t in story.tags]:
raise exc.DBDuplicateEntry(
_("The Story %(id)d already has a tag %(tag)s") %
{'id': story_id, 'tag': tag_name})
story.tags.append(tag)
story.updated_at = datetime.datetime.now(tz=pytz.utc)
session.add(story)
session.expunge(story)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:26,代码来源:stories.py
示例16: _board_get
def _board_get(id, session=None):
if not session:
session = api_base.get_session()
query = session.query(models.Board).options(
subqueryload(models.Board.lanes)).filter_by(id=id)
return query.first()
开发者ID:palvarez89,项目名称:storyboard,代码行数:7,代码来源:boards.py
示例17: get_item_by_item_id
def get_item_by_item_id(worklist, item_type, item_id, archived):
session = api_base.get_session()
query = session.query(models.WorklistItem).filter_by(
list_id=worklist.id, item_type=item_type,
item_id=item_id, archived=archived)
return query.first()
开发者ID:palvarez89,项目名称:storyboard,代码行数:7,代码来源:worklists.py
示例18: project_group_add_project
def project_group_add_project(project_group_id, project_id):
session = api_base.get_session()
with session.begin(subtransactions=True):
project_group = _entity_get(project_group_id, session)
if project_group is None:
raise exc.NotFound(_("%(name)s %(id)s not found")
% {'name': "Project Group",
'id': project_group_id})
project = projects.project_get(project_id)
if project is None:
raise exc.NotFound(_("%(name)s %(id)s not found")
% {'name': "Project", 'id': project_id})
if project_id in [p.id for p in project_group.projects]:
raise ClientSideError(_("The Project %(id)d is already in "
"Project Group %(group_id)d") %
{'id': project_id,
'group_id': project_group_id})
project_group.projects.append(project)
session.add(project_group)
return project_group
开发者ID:openstack-infra,项目名称:storyboard,代码行数:25,代码来源:project_groups.py
示例19: project_update_updated_at
def project_update_updated_at(project_id):
session = api_base.get_session()
project = project_get(project_id)
if project:
with session.begin(subtransactions=True):
project.updated_at = datetime.datetime.now(tz=pytz.utc)
session.add(project)
session.expunge(project)
开发者ID:openstack-infra,项目名称:storyboard,代码行数:8,代码来源:projects.py
示例20: _entity_get
def _entity_get(id, session=None):
if not session:
session = api_base.get_session()
query = session.query(models.Team)\
.options(subqueryload(models.Team.users))\
.filter_by(id=id)
return query.first()
开发者ID:openstack-infra,项目名称:storyboard,代码行数:8,代码来源:teams.py
注:本文中的storyboard.db.api.base.get_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论