本文整理汇总了Python中qonos.common.utils.serialize_datetimes函数的典型用法代码示例。如果您正苦于以下问题:Python serialize_datetimes函数的具体用法?Python serialize_datetimes怎么用?Python serialize_datetimes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了serialize_datetimes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get
def get(self, request, job_id):
try:
job = self.db_api.job_get_by_id(job_id)
except exception.NotFound:
raise webob.exc.HTTPNotFound
utils.serialize_datetimes(job)
return {"job": job}
开发者ID:komawar,项目名称:qonos,代码行数:7,代码来源:jobs.py
示例2: create
def create(self, request, body):
if (body is None or body.get('job') is None or
body['job'].get('schedule_id') is None):
raise webob.exc.HTTPBadRequest()
job = body['job']
try:
schedule = self.db_api.schedule_get_by_id(job['schedule_id'])
except exception.NotFound:
raise webob.exc.HTTPNotFound()
# Check integrity of schedule and update next run
expected_next_run = job.get('next_run')
if expected_next_run:
expected_next_run = timeutils.parse_isotime(job.get('next_run'))
next_run = api_utils.schedule_to_next_run(schedule, timeutils.utcnow())
try:
self.db_api.schedule_test_and_set_next_run(schedule['id'],
expected_next_run, next_run)
except exception.NotFound:
msg = _("Specified next run does not match the current next run"
" value. This could mean schedule has either changed"
"or has already been scheduled since you last expected.")
raise webob.exc.HTTPConflict(explanation=msg)
# Update schedule last_scheduled
values = {}
values['last_scheduled'] = timeutils.utcnow()
self.db_api.schedule_update(schedule['id'], values)
# Create job
values = {}
values.update(job)
values['tenant'] = schedule['tenant']
values['action'] = schedule['action']
values['status'] = 'QUEUED'
job_metadata = []
for metadata in schedule['schedule_metadata']:
job_metadata.append({
'key': metadata['key'],
'value': metadata['value']
})
values['job_metadata'] = job_metadata
job_action = values['action']
if not 'timeout' in values:
values['timeout'] = api_utils.get_new_timeout_by_action(job_action)
values['hard_timeout'] = \
api_utils.get_new_timeout_by_action(job_action)
job = self.db_api.job_create(values)
utils.serialize_datetimes(job)
api_utils.serialize_job_metadata(job)
job = {'job': job}
utils.generate_notification(None, 'qonos.job.create', job, 'INFO')
return job
开发者ID:pperezrubio,项目名称:qonos,代码行数:60,代码来源:jobs.py
示例3: create
def create(self, request, body=None):
invalid_params = []
if not body:
invalid_params.append('request body is empty')
elif not 'schedule' in body:
invalid_params.append('request body needs "schedule" entity')
else:
if not body['schedule'].get('tenant'):
invalid_params.append('request body needs "tenant" entity')
if not body['schedule'].get('action'):
invalid_params.append('request body needs "action" entity')
if invalid_params:
msg = _('The following errors occured with your request: %s') \
% ', '.join(invalid_params)
raise webob.exc.HTTPBadRequest(explanation=msg)
api_utils.deserialize_schedule_metadata(body['schedule'])
values = {}
values.update(body['schedule'])
values['next_run'] = api_utils.schedule_to_next_run(body['schedule'])
schedule = self.db_api.schedule_create(values)
utils.serialize_datetimes(schedule)
api_utils.serialize_schedule_metadata(schedule)
return {'schedule': schedule}
开发者ID:ameade,项目名称:qonos,代码行数:26,代码来源:schedules.py
示例4: get
def get(self, request, worker_id):
try:
worker = self.db_api.worker_get_by_id(worker_id)
except exception.NotFound:
msg = _('Worker %s could not be found.') % worker_id
raise webob.exc.HTTPNotFound(explanation=msg)
utils.serialize_datetimes(worker)
return {'worker': worker}
开发者ID:isethi,项目名称:qonos,代码行数:8,代码来源:workers.py
示例5: get
def get(self, request, schedule_id):
try:
schedule = self.db_api.schedule_get_by_id(schedule_id)
utils.serialize_datetimes(schedule)
except exception.NotFound:
msg = _('Schedule %s could not be found.') % schedule_id
raise webob.exc.HTTPNotFound(explanation=msg)
return {'schedule': schedule}
开发者ID:komawar,项目名称:qonos,代码行数:8,代码来源:schedules.py
示例6: update
def update(self, request, schedule_id, body):
if not body:
msg = _('The request body must not be empty')
raise webob.exc.HTTPBadRequest(explanation=msg)
elif not 'schedule' in body:
msg = _('The request body must contain a "schedule" entity')
raise webob.exc.HTTPBadRequest(explanation=msg)
# NOTE(jculp): only raise if a blank tenant is passed
# passing no tenant at all is perfectly fine.
elif ('tenant' in body['schedule'] and not
body['schedule']['tenant'].strip()):
msg = _('The request body has not specified a "tenant" entity')
raise webob.exc.HTTPBadRequest(explanation=msg)
api_utils.deserialize_schedule_metadata(body['schedule'])
values = {}
values.update(body['schedule'])
try:
values = api_utils.check_read_only_properties(values)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=unicode(e))
request_next_run = body['schedule'].get('next_run')
times = {
'minute': None,
'hour': None,
'month': None,
'day_of_week': None,
'day_of_month': None,
}
update_schedule_times = False
for key in times:
if key in values:
times[key] = values[key]
update_schedule_times = True
if update_schedule_times:
# NOTE(ameade): We must recalculate the schedules next_run time
# since the schedule has changed
values.update(times)
values['next_run'] = api_utils.schedule_to_next_run(times)
elif request_next_run:
try:
timeutils.parse_isotime(request_next_run)
except ValueError as e:
msg = _('Invalid "next_run" value. Must be ISO 8601 format')
raise webob.exc.HTTPBadRequest(explanation=msg)
try:
schedule = self.db_api.schedule_update(schedule_id, values)
except exception.NotFound:
msg = _('Schedule %s could not be found.') % schedule_id
raise webob.exc.HTTPNotFound(explanation=msg)
utils.serialize_datetimes(schedule)
api_utils.serialize_schedule_metadata(schedule)
return {'schedule': schedule}
开发者ID:ameade,项目名称:qonos,代码行数:58,代码来源:schedules.py
示例7: test_serialize_datetimes
def test_serialize_datetimes(self):
date_1 = datetime.datetime(2012, 5, 16, 15, 27, 36, 325355)
date_2 = datetime.datetime(2013, 5, 16, 15, 27, 36, 325355)
date_1_str = '2012-05-16T15:27:36Z'
date_2_str = '2013-05-16T15:27:36Z'
data = {'foo': date_1, 'bar': date_2}
expected = {'foo': date_1_str, 'bar': date_2_str}
utils.serialize_datetimes(data)
self.assertEqual(data, expected)
开发者ID:isethi,项目名称:qonos,代码行数:9,代码来源:test_utils.py
示例8: get_heartbeat
def get_heartbeat(self, request, job_id):
try:
updated_at = self.db_api.job_updated_at_get_by_id(job_id)
except exception.NotFound:
msg = _("Job %s could not be found.") % job_id
raise webob.exc.HTTPNotFound(explanation=msg)
heartbeat = {"heartbeat": updated_at}
utils.serialize_datetimes(heartbeat)
return heartbeat
开发者ID:komawar,项目名称:qonos,代码行数:10,代码来源:jobs.py
示例9: create
def create(self, request, body):
if body is None or body.get('schedule') is None:
raise webob.exc.HTTPBadRequest()
values = {}
values.update(body['schedule'])
values['next_run'] = self._schedule_to_next_run(body['schedule'])
schedule = self.db_api.schedule_create(values)
utils.serialize_datetimes(schedule)
return {'schedule': schedule}
开发者ID:komawar,项目名称:qonos,代码行数:11,代码来源:schedules.py
示例10: test_serialize_datetimes_nested_dict
def test_serialize_datetimes_nested_dict(self):
data = {
'data': {
'foo': {
'bar': datetime.datetime(2012, 5, 16, 15, 27, 36, 325355)
}
}
}
date_1_str = '2012-05-16T15:27:36Z'
utils.serialize_datetimes(data)
self.assertEqual(data['data']['foo']['bar'], date_1_str)
开发者ID:isethi,项目名称:qonos,代码行数:11,代码来源:test_utils.py
示例11: test_serialize_datetimes_list
def test_serialize_datetimes_list(self):
data = {
'data': [
{'foo': datetime.datetime(2012, 5, 16, 15, 27, 36, 325355)},
{'foo': datetime.datetime(2013, 5, 16, 15, 27, 36, 325355)},
]
}
date_1_str = '2012-05-16T15:27:36Z'
date_2_str = '2013-05-16T15:27:36Z'
utils.serialize_datetimes(data)
self.assertEqual(data['data'][0]['foo'], date_1_str)
self.assertEqual(data['data'][1]['foo'], date_2_str)
开发者ID:isethi,项目名称:qonos,代码行数:12,代码来源:test_utils.py
示例12: update_job_status
def update_job_status(self, job_id, status, timeout=None,
error_message=None):
body = {'status': {'status': status}}
if status.upper() == 'ERROR' and error_message:
body['status']['error_message'] = error_message
if timeout:
body['status']['timeout'] = timeout
utils.serialize_datetimes(body)
path = '/v1/jobs/%s/status' % job_id
return self._do_request('PUT', path, body)['status']
开发者ID:isethi,项目名称:qonos,代码行数:12,代码来源:client.py
示例13: create
def create(self, request, body=None):
if (body is None or type(body).__name__ != 'dict' or
body.get('schedule') is None):
raise webob.exc.HTTPBadRequest()
api_utils.deserialize_schedule_metadata(body['schedule'])
values = {}
values.update(body['schedule'])
values['next_run'] = api_utils.schedule_to_next_run(body['schedule'])
schedule = self.db_api.schedule_create(values)
utils.serialize_datetimes(schedule)
api_utils.serialize_schedule_metadata(schedule)
return {'schedule': schedule}
开发者ID:isethi,项目名称:qonos,代码行数:14,代码来源:schedules.py
示例14: list
def list(self, request):
params = self._get_request_params(request)
try:
params = utils.get_pagination_limit(params)
except exception.Invalid as e:
raise webob.exc.HTTPBadRequest(explanation=str(e))
try:
jobs = self.db_api.job_get_all(params)
except exception.NotFound:
raise webob.exc.HTTPNotFound()
for job in jobs:
utils.serialize_datetimes(job)
api_utils.serialize_job_metadata(job)
return {'jobs': jobs}
开发者ID:pperezrubio,项目名称:qonos,代码行数:15,代码来源:jobs.py
示例15: update
def update(self, request, schedule_id, body):
if body is None or body.get('schedule') is None:
raise webob.exc.HTTPBadRequest()
try:
values = {}
values.update(body['schedule'])
values['next_run'] = self._schedule_to_next_run(body['schedule'])
schedule = self.db_api.schedule_update(schedule_id,
values)
utils.serialize_datetimes(schedule)
except exception.NotFound:
msg = _('Schedule %s could not be found.') % schedule_id
raise webob.exc.HTTPNotFound(explanation=msg)
return {'schedule': schedule}
开发者ID:komawar,项目名称:qonos,代码行数:15,代码来源:schedules.py
示例16: create
def create(self, request, body):
if (body is None or body.get('job') is None or
body['job'].get('schedule_id') is None):
raise webob.exc.HTTPBadRequest()
job = body['job']
try:
schedule = self.db_api.schedule_get_by_id(job['schedule_id'])
except exception.NotFound:
raise webob.exc.HTTPNotFound()
# Update schedule last_scheduled and next_run
values = {}
values['next_run'] = api_utils.schedule_to_next_run(schedule)
print values['next_run']
values['last_scheduled'] = timeutils.utcnow()
self.db_api.schedule_update(schedule['id'], values)
# Create job
values = {}
values.update(job)
values['tenant'] = schedule['tenant']
values['action'] = schedule['action']
values['status'] = 'queued'
job_metadata = []
for metadata in schedule['schedule_metadata']:
job_metadata.append({
'key': metadata['key'],
'value': metadata['value']
})
values['job_metadata'] = job_metadata
now = timeutils.utcnow()
job_timeout_seconds = self._job_get_timeout(values['action'])
if not 'timeout' in values:
values['timeout'] = now +\
datetime.timedelta(seconds=job_timeout_seconds)
values['hard_timeout'] = now +\
datetime.timedelta(seconds=job_timeout_seconds)
job = self.db_api.job_create(values)
utils.serialize_datetimes(job)
api_utils.serialize_job_metadata(job)
return {'job': job}
开发者ID:isethi,项目名称:qonos,代码行数:48,代码来源:jobs.py
示例17: create
def create(self, request, body=None):
if not body:
msg = _('The request body must not be empty')
raise webob.exc.HTTPBadRequest(explanation=msg)
if not 'schedule' in body:
msg = _('The request body must contain a "schedule" entity')
raise webob.exc.HTTPBadRequest(explanation=msg)
api_utils.deserialize_schedule_metadata(body['schedule'])
values = {}
values.update(body['schedule'])
values['next_run'] = api_utils.schedule_to_next_run(body['schedule'])
schedule = self.db_api.schedule_create(values)
utils.serialize_datetimes(schedule)
api_utils.serialize_schedule_metadata(schedule)
return {'schedule': schedule}
开发者ID:pperezrubio,项目名称:qonos,代码行数:17,代码来源:schedules.py
示例18: get_next_job
def get_next_job(self, request, worker_id, body):
action = body.get('action')
try:
# Check that worker exists
self.db_api.worker_get_by_id(worker_id)
except exception.NotFound as e:
msg = _('Worker %s could not be found.') % worker_id
raise webob.exc.HTTPNotFound(explanation=msg)
max_retry = self._job_get_max_retry(action)
job = self.db_api.job_get_and_assign_next_by_action(
action, worker_id, max_retry)
if job:
utils.serialize_datetimes(job)
api_utils.serialize_job_metadata(job)
return {'job': job}
开发者ID:isethi,项目名称:qonos,代码行数:17,代码来源:workers.py
示例19: update
def update(self, request, schedule_id, body):
if not body:
msg = _('The request body must not be empty')
raise webob.exc.HTTPBadRequest(explanation=msg)
if not 'schedule' in body:
msg = _('The request body must contain a "schedule" entity')
raise webob.exc.HTTPBadRequest(explanation=msg)
api_utils.deserialize_schedule_metadata(body['schedule'])
values = {}
values.update(body['schedule'])
try:
values = api_utils.check_read_only_properties(values)
except exception.Forbidden as e:
raise webob.exc.HTTPForbidden(explanation=unicode(e))
times = {
'minute': None,
'hour': None,
'month': None,
'day_of_week': None,
'day_of_month': None,
}
update_schedule_times = False
for key in times:
if key in values:
times[key] = values[key]
update_schedule_times = True
if update_schedule_times:
# NOTE(ameade): We must recalculate the schedules next_run time
# since the schedule has changed
values.update(times)
values['next_run'] = api_utils.schedule_to_next_run(times)
try:
schedule = self.db_api.schedule_update(schedule_id, values)
except exception.NotFound:
msg = _('Schedule %s could not be found.') % schedule_id
raise webob.exc.HTTPNotFound(explanation=msg)
utils.serialize_datetimes(schedule)
api_utils.serialize_schedule_metadata(schedule)
return {'schedule': schedule}
开发者ID:pperezrubio,项目名称:qonos,代码行数:45,代码来源:schedules.py
示例20: list
def list(self, request):
filter_args = self._get_request_params(request)
try:
filter_args = utils.get_pagination_limit(filter_args)
except exception.Invalid as e:
raise webob.exc.HTTPBadRequest(explanation=str(e))
try:
schedules = self.db_api.schedule_get_all(filter_args=filter_args)
if len(schedules) != 0 and len(schedules) == filter_args['limit']:
next_page = '/v1/schedules?marker=%s' % schedules[-1].get('id')
else:
next_page = None
except exception.NotFound:
msg = _('The specified marker could not be found')
raise webob.exc.HTTPNotFound(explanation=msg)
for sched in schedules:
utils.serialize_datetimes(sched),
api_utils.serialize_schedule_metadata(sched)
links = [{'rel': 'next', 'href': next_page}]
return {'schedules': schedules, 'schedules_links': links}
开发者ID:isethi,项目名称:qonos,代码行数:20,代码来源:schedules.py
注:本文中的qonos.common.utils.serialize_datetimes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论