本文整理汇总了Python中pulp.server.db.model.repository.RepoSyncResult类的典型用法代码示例。如果您正苦于以下问题:Python RepoSyncResult类的具体用法?Python RepoSyncResult怎么用?Python RepoSyncResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RepoSyncResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _do_sync
def _do_sync(self, repo, importer_instance, transfer_repo, conduit, call_config):
"""
Once all of the preparation for a sync has taken place, this call
will perform the sync, making the necessary database updates. It returns
the sync result instance (already saved to the database). This call
does not have any behavior based on the success/failure of the sync;
it is up to the caller to raise an exception in the event of a failed
sync if that behavior is desired.
"""
importer_coll = RepoImporter.get_collection()
sync_result_coll = RepoSyncResult.get_collection()
repo_id = repo['id']
# Perform the sync
sync_start_timestamp = _now_timestamp()
try:
sync_report = importer_instance.sync_repo(transfer_repo, conduit, call_config)
except Exception, e:
# I really wish python 2.4 supported except and finally together
sync_end_timestamp = _now_timestamp()
# Reload the importer in case the plugin edits the scratchpad
repo_importer = importer_coll.find_one({'repo_id' : repo_id})
repo_importer['last_sync'] = sync_end_timestamp
importer_coll.save(repo_importer, safe=True)
# Add a sync history entry for this run
result = RepoSyncResult.error_result(repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
sync_result_coll.save(result, safe=True)
_LOG.exception(_('Exception caught from plugin during sync for repo [%(r)s]' % {'r' : repo_id}))
raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:jessegonzalez,项目名称:pulp,代码行数:34,代码来源:sync.py
示例2: sync
def sync(repo_id, sync_config_override=None):
"""
Performs a synchronize operation on the given repository and triggers publishs for distributors
with autopublish enabled.
The given repo must have an importer configured. This method is intentionally limited to
synchronizing a single repo. Performing multiple repository syncs concurrently will require a
more global view of the server and must be handled outside the scope of this class.
:param repo_id: identifies the repo to sync
:type repo_id: str
:param sync_config_override: optional config containing values to use for this sync only
:type sync_config_override: dict
:return: TaskResult containing sync results and a list of spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
:raise pulp_exceptions.MissingResource: if specified repo does not exist, or it does not have
an importer and associated plugin
:raise pulp_exceptions.PulpExecutionException: if the task fails.
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
transfer_repo = repo_obj.to_transfer_repo()
importer_collection = RepoImporter.get_collection()
repo_importer = importer_collection.find_one({'repo_id': repo_obj.repo_id})
if repo_importer is None:
raise pulp_exceptions.MissingResource(repository=repo_id)
try:
importer, imp_config = plugin_api.get_importer_by_id(repo_importer['importer_type_id'])
except plugin_exceptions.PluginNotFound:
raise pulp_exceptions.MissingResource(repository=repo_id)
call_config = PluginCallConfiguration(imp_config, repo_importer['config'], sync_config_override)
transfer_repo.working_dir = common_utils.get_working_directory()
conduit = RepoSyncConduit(repo_id, repo_importer['id'])
sync_result_collection = RepoSyncResult.get_collection()
# Fire an events around the call
fire_manager = manager_factory.event_fire_manager()
fire_manager.fire_repo_sync_started(repo_id)
# Perform the sync
sync_start_timestamp = _now_timestamp()
sync_result = None
try:
# Replace the Importer's sync_repo() method with our register_sigterm_handler decorator,
# which will set up cancel_sync_repo() as the target for the signal handler
sync_repo = register_sigterm_handler(importer.sync_repo, importer.cancel_sync_repo)
sync_report = sync_repo(transfer_repo, conduit, call_config)
except Exception, e:
sync_end_timestamp = _now_timestamp()
sync_result = RepoSyncResult.error_result(
repo_obj.repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
raise
开发者ID:zjhuntin,项目名称:pulp,代码行数:60,代码来源:repository.py
示例3: add_result
def add_result(repo_id, offset):
started = datetime.datetime.now(dateutils.local_tz())
completed = started + datetime.timedelta(days=offset)
r = RepoSyncResult.expected_result(repo_id, 'foo', 'bar', dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed), 1, 1, 1, '', '',
RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
开发者ID:cliffy94,项目名称:pulp,代码行数:7,代码来源:test_repo_sync_manager.py
示例4: test_sync_history_end_date
def test_sync_history_end_date(self):
"""
Tests the functionality of requesting sync history before a given date
"""
# Setup
self.repo_manager.create_repo('test_repo')
# A date string to fake some dates
date_string = '2013-06-01T12:00:0%sZ'
# Create 3 entries, with each date entry one second later
for i in range(0, 6, 2):
r = RepoSyncResult.expected_result('test_repo', 'foo', 'bar', date_string % str(i),
date_string % str(i + 1), 1, 1, 1, '', '',
RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
# Verify three entries in test_repo
self.assertEqual(3, len(self.sync_manager.sync_history('test_repo')))
# Retrieve the first two entries
end_date = '2013-06-01T12:00:03Z'
end_entries = self.sync_manager.sync_history('test_repo', end_date=end_date)
# Confirm the dates of the retrieved entries are earlier than or equal to the requested date
self.assertEqual(2, len(end_entries))
for entry in end_entries:
retrieved = dateutils.parse_iso8601_datetime(entry['started'])
given_end = dateutils.parse_iso8601_datetime(end_date)
self.assertTrue(retrieved <= given_end)
开发者ID:beav,项目名称:pulp,代码行数:26,代码来源:test_sync.py
示例5: _do_sync
def _do_sync(repo, importer_instance, transfer_repo, conduit, call_config):
"""
Once all of the preparation for a sync has taken place, this call
will perform the sync, making the necessary database updates. It returns
the sync result instance (already saved to the database). This call
does not have any behavior based on the success/failure of the sync;
it is up to the caller to raise an exception in the event of a failed
sync if that behavior is desired.
"""
importer_coll = RepoImporter.get_collection()
sync_result_coll = RepoSyncResult.get_collection()
repo_id = repo['id']
repo_importer = importer_coll.find_one({'repo_id': repo_id})
# Perform the sync
sync_start_timestamp = _now_timestamp()
sync_end_timestamp = None
result = None
try:
# Replace the Importer's sync_repo() method with our register_sigterm_handler decorator,
# which will set up cancel_sync_repo() as the target for the signal handler
sync_repo = register_sigterm_handler(importer_instance.sync_repo,
importer_instance.cancel_sync_repo)
sync_report = sync_repo(transfer_repo, conduit, call_config)
except Exception, e:
sync_end_timestamp = _now_timestamp()
result = RepoSyncResult.error_result(
repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
raise
开发者ID:mykelalvis,项目名称:pulp,代码行数:35,代码来源:sync.py
示例6: _do_sync
def _do_sync(self, repo, importer_instance, transfer_repo, conduit, call_config):
"""
Once all of the preparation for a sync has taken place, this call
will perform the sync, making the necessary database updates. It returns
the sync result instance (already saved to the database). This call
does not have any behavior based on the success/failure of the sync;
it is up to the caller to raise an exception in the event of a failed
sync if that behavior is desired.
"""
importer_coll = RepoImporter.get_collection()
sync_result_coll = RepoSyncResult.get_collection()
repo_id = repo['id']
repo_importer = importer_coll.find_one({'repo_id' : repo_id})
# Perform the sync
sync_start_timestamp = _now_timestamp()
sync_end_timestamp = None
result = None
try:
sync_report = importer_instance.sync_repo(transfer_repo, conduit, call_config)
except Exception, e:
sync_end_timestamp = _now_timestamp()
result = RepoSyncResult.error_result(repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
_LOG.exception(_('Exception caught from plugin during sync for repo [%(r)s]' % {'r' : repo_id}))
raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:kbotc,项目名称:pulp,代码行数:31,代码来源:sync.py
示例7: tearDown
def tearDown(self):
super(TestDoSync, self).tearDown()
mock_plugins.reset()
manager_factory.reset()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoSyncResult.get_collection().remove()
MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py
示例8: clean
def clean(self):
super(RepoSyncManagerTests, self).clean()
Repo.get_collection().remove()
RepoImporter.get_collection().remove()
RepoSyncResult.get_collection().remove()
# Reset the state of the mock's tracker variables
MockRepoPublishManager.reset()
开发者ID:beav,项目名称:pulp,代码行数:8,代码来源:test_sync.py
示例9: sync_history
def sync_history(start_date, end_date, repo_id):
"""
Returns a cursor containing the sync history entries for the given repo.
:param start_date: if specified, no events prior to this date will be returned. Expected to be
an iso8601 datetime string.
:type start_date: str
:param end_date: if specified, no events after this date will be returned. Expected to be an
iso8601 datetime string.
:type end_date: str
:param repo_id: identifies the repo
:type repo_id: str
:return: object containing sync history results
:rtype: pymongo.cursor.Cursor
:raise MissingResource: if repo_id does not reference a valid repo
"""
model.Repository.objects.get_repo_or_missing_resource(repo_id)
search_params = {'repo_id': repo_id}
date_range = {}
if start_date:
date_range['$gte'] = start_date
if end_date:
date_range['$lte'] = end_date
if start_date or end_date:
search_params['started'] = date_range
return RepoSyncResult.get_collection().find(search_params)
开发者ID:zjhuntin,项目名称:pulp,代码行数:28,代码来源:repository.py
示例10: sync_history
def sync_history(self, repo_id, limit=None):
"""
Returns sync history entries for the given repo, sorted from most recent
to oldest. If there are no entries, an empty list is returned.
@param repo_id: identifies the repo
@type repo_id: str
@param limit: maximum number of results to return
@type limit: int
@return: list of sync history result instances
@rtype: list of L{pulp.server.db.model.repository.RepoSyncResult}
@raise MissingResource: if repo_id does not reference a valid repo
"""
# Validation
repo = Repo.get_collection().find_one({'id' : repo_id})
if repo is None:
raise MissingResource(repo_id)
if limit is None:
limit = 10 # default here for each of REST API calls into here
# Retrieve the entries
cursor = RepoSyncResult.get_collection().find({'repo_id' : repo_id})
cursor.limit(limit)
cursor.sort('completed', pymongo.DESCENDING)
return list(cursor)
开发者ID:jessegonzalez,项目名称:pulp,代码行数:31,代码来源:sync.py
示例11: test_sync
def test_sync(self, mock_finished, mock_started):
"""
Tests sync under normal conditions where everything is configured
correctly. No importer config is specified.
"""
# Setup
sync_config = {'bruce': 'hulk', 'tony': 'ironman'}
self.repo_manager.create_repo('repo-1')
self.importer_manager.set_importer('repo-1', 'mock-importer', sync_config)
# Test
self.sync_manager.sync('repo-1', sync_config_override=None)
# Verify
repo = Repo.get_collection().find_one({'id': 'repo-1'})
repo_importer = RepoImporter.get_collection().find_one({'repo_id': 'repo-1',
'id': 'mock-importer'})
# Database
self.assertTrue(repo_importer['last_sync'] is not None)
self.assertTrue(assert_last_sync_time(repo_importer['last_sync']))
# Call into the Importer
sync_args = mock_plugins.MOCK_IMPORTER.sync_repo.call_args[0]
self.assertEqual(repo['id'], sync_args[0].id)
self.assertTrue(sync_args[1] is not None)
self.assertEqual({}, sync_args[2].plugin_config)
self.assertEqual(sync_config, sync_args[2].repo_plugin_config)
self.assertEqual({}, sync_args[2].override_config)
# History Entry
history = list(RepoSyncResult.get_collection().find({'repo_id': 'repo-1'}))
self.assertEqual(1, len(history))
self.assertEqual('repo-1', history[0]['repo_id'])
self.assertEqual(RepoSyncResult.RESULT_SUCCESS, history[0]['result'])
self.assertEqual('mock-importer', history[0]['importer_id'])
self.assertEqual('mock-importer', history[0]['importer_type_id'])
self.assertTrue(history[0]['started'] is not None)
self.assertTrue(history[0]['completed'] is not None)
self.assertEqual(10, history[0]['added_count'])
self.assertEqual(1, history[0]['removed_count'])
self.assertTrue(history[0]['summary'] is not None)
self.assertTrue(history[0]['details'] is not None)
self.assertTrue(history[0]['error_message'] is None)
self.assertTrue(history[0]['exception'] is None)
self.assertTrue(history[0]['traceback'] is None)
self.assertEqual(1, mock_started.call_count)
self.assertEqual('repo-1', mock_started.call_args[0][0])
self.assertEqual(1, mock_finished.call_count)
self.assertEqual('repo-1', mock_finished.call_args[0][0]['repo_id'])
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:56,代码来源:test_sync.py
示例12: test_sync_history_descending_sort
def test_sync_history_descending_sort(self):
# Setup
self.repo_manager.create_repo('test_sort')
date_string = '2013-06-01T12:00:0%sZ'
# Add some consecutive sync entries
for i in range(0, 10, 2):
r = RepoSyncResult.expected_result('test_sort', 'foo', 'bar', date_string % str(i),
date_string % str(i + 1), 1, 1, 1, '', '',
RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
# Test sort by descending start date
entries = self.sync_manager.sync_history(repo_id='test_sort', sort=constants.SORT_DESCENDING)
self.assertEqual(5, len(entries))
# Verify that each entry has a later completed date than the next one
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]['started'])
second = dateutils.parse_iso8601_datetime(entries[i + 1]['started'])
self.assertTrue(first > second)
开发者ID:aweiteka,项目名称:pulp,代码行数:20,代码来源:test_sync.py
示例13: test_sync_cancelled
def test_sync_cancelled(self, mock_current_task_state):
"""
Test the repo sync result on a cancelled sync.
"""
repo_id = 'cancelled_repo'
self.repo_manager.create_repo(repo_id)
self.importer_manager.set_importer(repo_id, 'mock-importer', {})
mock_plugins.MOCK_IMPORTER.sync_repo.return_value = None
self.sync_manager.sync(repo_id)
sync_result = RepoSyncResult.get_collection().find_one({'repo_id': repo_id})
self.assertFalse(sync_result is None)
self.assertEqual(sync_result['result'], RepoSyncResult.RESULT_CANCELED)
self.assertEqual(mock_current_task_state.call_count, 1)
开发者ID:bartwo,项目名称:pulp,代码行数:15,代码来源:test_repo_sync_manager.py
示例14: test_sync_with_error
def test_sync_with_error(self):
"""
Tests a sync when the plugin raises an error.
"""
# Setup
class FakePluginException(Exception):
pass
error_msg = 'Error test'
mock_plugins.MOCK_IMPORTER.sync_repo.side_effect = FakePluginException(error_msg)
self.repo_manager.create_repo('gonna-bail')
self.importer_manager.set_importer('gonna-bail', 'mock-importer', {})
# Test
self.assertRaises(Exception, self.sync_manager.sync, 'gonna-bail')
# Database
repo_importer = RepoImporter.get_collection().find_one({'repo_id': 'gonna-bail',
'id': 'mock-importer'})
self.assertTrue(repo_importer['last_sync'] is not None)
self.assertTrue(assert_last_sync_time(repo_importer['last_sync']))
# History Entry
history = list(RepoSyncResult.get_collection().find({'repo_id': 'gonna-bail'}))
self.assertEqual(1, len(history))
self.assertEqual('gonna-bail', history[0]['repo_id'])
self.assertEqual(RepoSyncResult.RESULT_ERROR, history[0]['result'])
self.assertEqual('mock-importer', history[0]['importer_id'])
self.assertEqual('mock-importer', history[0]['importer_type_id'])
self.assertTrue(history[0]['started'] is not None)
self.assertTrue(history[0]['completed'] is not None)
self.assertTrue(history[0]['added_count'] is None)
self.assertTrue(history[0]['updated_count'] is None)
self.assertTrue(history[0]['removed_count'] is None)
self.assertTrue(history[0]['summary'] is None)
self.assertTrue(history[0]['details'] is None)
self.assertEqual(error_msg, history[0]['error_message'])
self.assertTrue('FakePluginException' in history[0]['exception'])
self.assertTrue(history[0]['traceback'] is not None)
# Cleanup
mock_plugins.MOCK_IMPORTER.sync_repo.side_effect = None
开发者ID:beav,项目名称:pulp,代码行数:47,代码来源:test_sync.py
示例15: test_sync_no_plugin_report
def test_sync_no_plugin_report(self):
"""
Tests synchronizing against a sloppy plugin that doesn't return a sync report.
"""
# Setup
self.repo_manager.create_repo('repo-1')
self.importer_manager.set_importer('repo-1', 'mock-importer', {})
mock_plugins.MOCK_IMPORTER.sync_repo.return_value = None # sloppy plugin
# Test
self.sync_manager.sync('repo-1')
# Verify
# History Entry
history = list(RepoSyncResult.get_collection().find({'repo_id' : 'repo-1'}))
self.assertEqual(1, len(history))
self.assertEqual('repo-1', history[0]['repo_id'])
self.assertEqual(RepoSyncResult.RESULT_ERROR, history[0]['result'])
self.assertEqual('mock-importer', history[0]['importer_id'])
self.assertEqual('mock-importer', history[0]['importer_type_id'])
self.assertTrue(history[0]['started'] is not None)
self.assertTrue(history[0]['completed'] is not None)
self.assertEqual(-1, history[0]['added_count'])
self.assertEqual(-1, history[0]['updated_count'])
self.assertEqual(-1, history[0]['removed_count'])
expected_message = ('Plugin type [mock-importer] on repo [repo-1] did not return a valid '
'sync report')
self.assertEqual(expected_message, history[0]['summary'])
self.assertEqual(expected_message, history[0]['details'])
self.assertTrue(history[0]['error_message'] is None)
self.assertTrue(history[0]['exception'] is None)
self.assertTrue(history[0]['traceback'] is None)
开发者ID:aweiteka,项目名称:pulp,代码行数:38,代码来源:test_sync.py
示例16: test_sync_with_graceful_fail
def test_sync_with_graceful_fail(self):
# Setup
sync_config = {'bruce' : 'hulk', 'tony' : 'ironman'}
self.repo_manager.create_repo('repo-1')
self.importer_manager.set_importer('repo-1', 'mock-importer', sync_config)
mock_plugins.MOCK_IMPORTER.sync_repo.return_value = SyncReport(False, 10, 5, 1, 'Summary of the sync', 'Details of the sync')
# Test
self.assertRaises(PulpExecutionException, self.sync_manager.sync, 'repo-1')
# Verify
history = list(RepoSyncResult.get_collection().find({'repo_id' : 'repo-1'}))
self.assertEqual(1, len(history))
self.assertEqual('repo-1', history[0]['repo_id'])
self.assertEqual(RepoSyncResult.RESULT_FAILED, history[0]['result'])
self.assertEqual('mock-importer', history[0]['importer_id'])
self.assertEqual('mock-importer', history[0]['importer_type_id'])
self.assertTrue(history[0]['started'] is not None)
self.assertTrue(history[0]['completed'] is not None)
# Cleanup
mock_plugins.reset()
开发者ID:aweiteka,项目名称:pulp,代码行数:23,代码来源:test_sync.py
示例17: _validate_repo_sync_result
def _validate_repo_sync_result():
objectdb = RepoSyncResult.get_collection()
reference = RepoSyncResult('', '', '', '', '', '')
return _validate_model(RepoSyncResult.__name__, objectdb, reference)
开发者ID:ehelms,项目名称:pulp,代码行数:4,代码来源:validate.py
示例18: print
try:
importer, config = self.sync_manager._get_importer_instance_and_config('gonna-bail')
self.sync_manager.sync('gonna-bail', importer, config)
except repo_sync_manager.PulpExecutionException, e:
print(e) # for coverage
# Verify
# Database
repo_importer = RepoImporter.get_collection().find_one({'repo_id' : 'gonna-bail', 'id' : 'mock-importer'})
self.assertTrue(repo_importer['last_sync'] is not None)
self.assertTrue(assert_last_sync_time(repo_importer['last_sync']))
# History Entry
history = list(RepoSyncResult.get_collection().find({'repo_id' : 'gonna-bail'}))
self.assertEqual(1, len(history))
self.assertEqual('gonna-bail', history[0]['repo_id'])
self.assertEqual(RepoSyncResult.RESULT_ERROR, history[0]['result'])
self.assertEqual('mock-importer', history[0]['importer_id'])
self.assertEqual('mock-importer', history[0]['importer_type_id'])
self.assertTrue(history[0]['started'] is not None)
self.assertTrue(history[0]['completed'] is not None)
self.assertTrue(history[0]['added_count'] is None)
self.assertTrue(history[0]['updated_count'] is None)
self.assertTrue(history[0]['removed_count'] is None)
self.assertTrue(history[0]['summary'] is None)
self.assertTrue(history[0]['details'] is None)
self.assertEqual(error_msg, history[0]['error_message'])
开发者ID:ryanschneider,项目名称:pulp,代码行数:31,代码来源:test_repo_sync_manager.py
示例19: len
logger.exception('Error while deleting repo working dir [%s] for repo [%s]' % (
repo_working_dir, repo_id))
error_tuples.append(e)
# Database Updates
try:
Repo.get_collection().remove({'id' : repo_id}, safe=True)
# Remove all importers and distributors from the repo
# This is likely already done by the calls to other methods in
# this manager, but in case those failed we still want to attempt
# to keep the database clean
RepoDistributor.get_collection().remove({'repo_id' : repo_id}, safe=True)
RepoImporter.get_collection().remove({'repo_id' : repo_id}, safe=True)
RepoSyncResult.get_collection().remove({'repo_id' : repo_id}, safe=True)
RepoPublishResult.get_collection().remove({'repo_id' : repo_id}, safe=True)
# Remove all associations from the repo
RepoContentUnit.get_collection().remove({'repo_id' : repo_id}, safe=True)
except Exception, e:
msg = _('Error updating one or more database collections while removing repo [%(r)s]')
msg = msg % {'r': repo_id}
logger.exception(msg)
error_tuples.append(e)
# remove the repo from any groups it was a member of
group_manager = manager_factory.repo_group_manager()
group_manager.remove_repo_from_groups(repo_id)
if len(error_tuples) > 0:
开发者ID:aweiteka,项目名称:pulp,代码行数:31,代码来源:cud.py
示例20: sync_history
def sync_history(self, repo_id, limit=None, sort=constants.SORT_DESCENDING, start_date=None,
end_date=None):
"""
Returns sync history entries for the given repo, sorted from most recent
to oldest. If there are no entries, an empty list is returned.
:param repo_id: identifies the repo
:type repo_id: str
:param limit: if specified, the query will only return up to this amount of
entries; default is to return the entire sync history
:type limit: int
:param sort: Indicates the sort direction of the results, which are sorted by start date. Options
are "ascending" and "descending". Descending is the default.
:type sort: str
:param start_date: if specified, no events prior to this date will be returned. Expected to be an
iso8601 datetime string.
:type start_date: str
:param end_date: if specified, no events after this date will be returned. Expected to be an
iso8601 datetime string.
:type end_date: str
:return: list of sync history result instances
:rtype: list
:raise MissingResource: if repo_id does not reference a valid repo
:raise InvalidValue: if one or more options are invalid
"""
# Validation
repo = Repo.get_collection().find_one({'id': repo_id})
if repo is None:
raise MissingResource(repo_id)
invalid_values = []
# Verify the limit makes sense
if limit is not None:
try:
limit = int(limit)
if limit < 1:
invalid_values.append('limit')
except ValueError:
invalid_values.append('limit')
# Verify the sort direction is valid
if sort not in constants.SORT_DIRECTION:
invalid_values.append('sort')
# Verify that start_date and end_date is valid
if start_date is not None:
try:
dateutils.parse_iso8601_datetime(start_date)
except (ValueError, isodate.ISO8601Error):
invalid_values.append('start_date')
if end_date is not None:
try:
dateutils.parse_iso8601_datetime(end_date)
except (ValueError, isodate.ISO8601Error):
invalid_values.append('end_date')
# Report any invalid values
if invalid_values:
raise InvalidValue(invalid_values)
# Assemble the mongo search parameters
search_params = {'repo_id': repo_id}
# Add in date range limits if specified
date_range = {}
if start_date:
date_range['$gte'] = start_date
if end_date:
date_range['$lte'] = end_date
if len(date_range) > 0:
search_params['started'] = date_range
# Retrieve the entries
cursor = RepoSyncResult.get_collection().find(search_params)
# Sort the results on the 'started' field. By default, descending order is used
cursor.sort('started', direction=constants.SORT_DIRECTION[sort])
if limit is not None:
cursor.limit(limit)
return list(cursor)
开发者ID:aweiteka,项目名称:pulp,代码行数:82,代码来源:sync.py
注:本文中的pulp.server.db.model.repository.RepoSyncResult类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论