本文整理汇总了Python中st2common.util.isotime.parse函数的典型用法代码示例。如果您正苦于以下问题:Python parse函数的具体用法?Python parse怎么用?Python parse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_descending_sort
def test_descending_sort(self):
response = self.app.get('/v1/executions?sort_desc=True')
self.assertEqual(response.status_int, 200)
self.assertIsInstance(response.json, list)
dt1 = response.json[0]['start_timestamp']
dt2 = response.json[len(response.json) - 1]['start_timestamp']
self.assertLess(isotime.parse(dt2), isotime.parse(dt1))
开发者ID:StackStorm,项目名称:st2,代码行数:7,代码来源:test_executions_filters.py
示例2: test_default_sort
def test_default_sort(self):
response = self.app.get('/history/executions')
self.assertEqual(response.status_int, 200)
self.assertIsInstance(response.json, list)
dt1 = response.json[0]['execution']['start_timestamp']
dt2 = response.json[len(response.json) - 1]['execution']['start_timestamp']
self.assertLess(isotime.parse(dt2), isotime.parse(dt1))
开发者ID:bjoernbessert,项目名称:st2,代码行数:7,代码来源:test_history.py
示例3: purge_executions
def purge_executions(timestamp=None, action_ref=None, purge_incomplete=False):
if not timestamp:
LOG.error("Specify a valid timestamp to purge.")
return
LOG.info("Purging executions older than timestamp: %s" % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
filters = {}
if action_ref:
filters["action__ref"] = action_ref
if purge_incomplete:
filters["start_timestamp__lt"] = isotime.parse(timestamp)
else:
filters["end_timestamp__lt"] = isotime.parse(timestamp)
filters["start_timestamp__lt"] = isotime.parse(timestamp)
filters["status"] = {"$in": DONE_STATES}
# XXX: Think about paginating this call.
executions = ActionExecution.query(**filters)
LOG.info("#### Total number of executions to delete: %d" % len(executions))
# Purge execution and liveaction models now
for execution_db in executions:
_purge_models(execution_db)
# Print stats
LOG.info("#### Total execution models deleted: %d" % DELETED_COUNT)
开发者ID:beryah,项目名称:st2,代码行数:28,代码来源:purge_executions.py
示例4: _process_datetime_range_filters
def _process_datetime_range_filters(self, filters, order_by=None):
ranges = {k: v for k, v in filters.iteritems() if type(v) in [str, unicode] and ".." in v}
order_by_list = copy.deepcopy(order_by) if order_by else []
for k, v in ranges.iteritems():
values = v.split("..")
dt1 = isotime.parse(values[0])
dt2 = isotime.parse(values[1])
k__gte = "%s__gte" % k
k__lte = "%s__lte" % k
if dt1 < dt2:
query = {k__gte: dt1, k__lte: dt2}
sort_key, reverse_sort_key = k, "-" + k
else:
query = {k__gte: dt2, k__lte: dt1}
sort_key, reverse_sort_key = "-" + k, k
del filters[k]
filters.update(query)
if reverse_sort_key in order_by_list:
idx = order_by_list.index(reverse_sort_key)
order_by_list.pop(idx)
order_by_list.insert(idx, sort_key)
elif sort_key not in order_by_list:
order_by_list = [sort_key] + order_by_list
return filters, order_by_list
开发者ID:ipv1337,项目名称:st2,代码行数:28,代码来源:__init__.py
示例5: _process_datetime_range_filters
def _process_datetime_range_filters(self, filters, order_by=None):
ranges = {k: v for k, v in six.iteritems(filters)
if type(v) in [str, six.text_type] and '..' in v}
order_by_list = copy.deepcopy(order_by) if order_by else []
for k, v in six.iteritems(ranges):
values = v.split('..')
dt1 = isotime.parse(values[0])
dt2 = isotime.parse(values[1])
k__gte = '%s__gte' % k
k__lte = '%s__lte' % k
if dt1 < dt2:
query = {k__gte: dt1, k__lte: dt2}
sort_key, reverse_sort_key = k, '-' + k
else:
query = {k__gte: dt2, k__lte: dt1}
sort_key, reverse_sort_key = '-' + k, k
del filters[k]
filters.update(query)
if reverse_sort_key in order_by_list:
idx = order_by_list.index(reverse_sort_key)
order_by_list.pop(idx)
order_by_list.insert(idx, sort_key)
elif sort_key not in order_by_list:
order_by_list = [sort_key] + order_by_list
return filters, order_by_list
开发者ID:nzlosh,项目名称:st2,代码行数:29,代码来源:__init__.py
示例6: to_model
def to_model(cls, live_action):
name = getattr(live_action, 'name', None)
description = getattr(live_action, 'description', None)
action = live_action.action
if getattr(live_action, 'start_timestamp', None):
start_timestamp = isotime.parse(live_action.start_timestamp)
else:
start_timestamp = None
if getattr(live_action, 'end_timestamp', None):
end_timestamp = isotime.parse(live_action.end_timestamp)
else:
end_timestamp = None
status = getattr(live_action, 'status', None)
parameters = getattr(live_action, 'parameters', dict())
context = getattr(live_action, 'context', dict())
callback = getattr(live_action, 'callback', dict())
result = getattr(live_action, 'result', None)
if getattr(live_action, 'notify', None):
notify = NotificationsHelper.to_model(live_action.notify)
else:
notify = None
model = cls.model(name=name, description=description, action=action,
start_timestamp=start_timestamp, end_timestamp=end_timestamp,
status=status, parameters=parameters, context=context,
callback=callback, result=result, notify=notify)
return model
开发者ID:hejin,项目名称:st2,代码行数:32,代码来源:action.py
示例7: _get_updated_action_exec_result
def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
created_at = isotime.parse(prev_task_result['created_at'])
updated_at = liveaction.end_timestamp
else:
created_at = isotime.parse(prev_task_result['created_at'])
updated_at = isotime.parse(prev_task_result['updated_at'])
return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
开发者ID:nzlosh,项目名称:st2,代码行数:9,代码来源:action_chain_runner.py
示例8: test_get_all
def test_get_all(self):
self._get_actionexecution_id(self._do_post(LIVE_ACTION_1))
self._get_actionexecution_id(self._do_post(LIVE_ACTION_2))
resp = self.app.get("/v1/executions")
body = resp.json
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json), 2, "/v1/executions did not return all " "actionexecutions.")
# Assert liveactions are sorted by timestamp.
for i in range(len(body) - 1):
self.assertTrue(isotime.parse(body[i]["start_timestamp"]) >= isotime.parse(body[i + 1]["start_timestamp"]))
开发者ID:enykeev,项目名称:st2,代码行数:10,代码来源:test_executions_simple.py
示例9: to_model
def to_model(cls, instance):
model = cls.model()
for attr, meta in six.iteritems(cls.schema.get('properties', dict())):
default = copy.deepcopy(meta.get('default', None))
value = getattr(instance, attr, default)
if not value and not cls.model._fields[attr].required:
continue
if attr not in ActionExecutionAPI.SKIP:
setattr(model, attr, value)
model.start_timestamp = isotime.parse(instance.start_timestamp)
model.end_timestamp = isotime.parse(instance.end_timestamp)
return model
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:12,代码来源:execution.py
示例10: test_write_marker_to_db
def test_write_marker_to_db(self):
executions_queue = self.get_queue()
dumper = Dumper(queue=executions_queue,
export_dir='/tmp', batch_size=5,
max_files_per_sleep=1,
file_prefix='st2-stuff-', file_format='json')
timestamps = [isotime.parse(execution.end_timestamp) for execution in self.execution_apis]
max_timestamp = max(timestamps)
marker_db = dumper._write_marker_to_db(max_timestamp)
persisted_marker = marker_db.marker
self.assertTrue(isinstance(persisted_marker, six.string_types))
self.assertEqual(isotime.parse(persisted_marker), max_timestamp)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:12,代码来源:test_dumper_integration.py
示例11: test_get_all
def test_get_all(self):
self._get_actionexecution_id(self._do_post(ACTION_EXECUTION_1))
self._get_actionexecution_id(self._do_post(ACTION_EXECUTION_2))
resp = self.app.get('/v1/actionexecutions')
body = resp.json
# Assert executions are sorted by timestamp.
for i in range(len(body) - 1):
self.assertTrue(isotime.parse(body[i]['start_timestamp']) >=
isotime.parse(body[i + 1]['start_timestamp']))
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json), 2,
'/v1/actionexecutions did not return all '
'actionexecutions.')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:13,代码来源:test_actionexecutions.py
示例12: _purge_executions
def _purge_executions(timestamp=None, action_ref=None):
if not timestamp:
print('Specify a valid timestamp to purge.')
return
if not action_ref:
action_ref = ''
print('Purging executions older than timestamp: %s' %
timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
def should_delete(execution_db):
if action_ref != '':
return (execution_db.liveaction['action'] == action_ref
and execution_db.start_timestamp < timestamp)
else:
return execution_db.start_timestamp < timestamp
# XXX: Think about paginating this call.
filters = {'start_timestamp__lt': isotime.parse(timestamp)}
executions = ActionExecution.query(**filters)
executions_to_delete = filter(should_delete, executions)
print('#### Total number of executions to delete: %d' % len(executions_to_delete))
# Purge execution and liveaction models now
for execution_db in executions_to_delete:
_purge_action_models(execution_db)
# Print stats
print('#### Total execution models deleted: %d' % DELETED_COUNT)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:30,代码来源:purge_executions.py
示例13: to_model
def to_model(cls, instance):
user = str(instance.user) if instance.user else None
token = str(instance.token) if instance.token else None
expiry = isotime.parse(instance.expiry) if instance.expiry else None
model = cls.model(user=user, token=token, expiry=expiry)
return model
开发者ID:nzlosh,项目名称:st2,代码行数:7,代码来源:auth.py
示例14: purge_trigger_instances
def purge_trigger_instances(logger, timestamp):
"""
:param timestamp: Trigger instances older than this timestamp will be deleted.
:type timestamp: ``datetime.datetime
"""
if not timestamp:
raise ValueError('Specify a valid timestamp to purge.')
logger.info('Purging trigger instances older than timestamp: %s' %
timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
query_filters = {'occurrence_time__lt': isotime.parse(timestamp)}
# TODO: Update this code to return statistics on deleted objects once we
# upgrade to newer version of MongoDB where delete_by_query actually returns
# some data
try:
TriggerInstance.delete_by_query(**query_filters)
except InvalidQueryError as e:
msg = ('Bad query (%s) used to delete trigger instances: %s'
'Please contact support.' % (query_filters, str(e)))
raise InvalidQueryError(msg)
except:
logger.exception('Deleting instances using query_filters %s failed.', query_filters)
# Print stats
logger.info('All trigger instance models older than timestamp %s were deleted.', timestamp)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:28,代码来源:trigger_instances.py
示例15: to_model
def to_model(cls, instance):
model = cls.model()
for attr, meta in six.iteritems(cls.schema.get('properties', dict())):
default = copy.deepcopy(meta.get('default', None))
value = getattr(instance, attr, default)
# pylint: disable=no-member
# TODO: Add plugin which lets pylint know each MongoEngine document has _fields
# attribute
if not value and not cls.model._fields[attr].required:
continue
if attr not in ActionExecutionAPI.SKIP:
setattr(model, attr, value)
model.start_timestamp = isotime.parse(instance.start_timestamp)
model.end_timestamp = isotime.parse(instance.end_timestamp)
return model
开发者ID:saydulk,项目名称:st2,代码行数:16,代码来源:execution.py
示例16: test_crud_partial
def test_crud_partial(self):
# Create the DB record.
obj = ActionExecutionHistoryAPI(**copy.deepcopy(self.fake_history_subtasks[0]))
ActionExecutionHistory.add_or_update(ActionExecutionHistoryAPI.to_model(obj))
model = ActionExecutionHistory.get_by_id(obj.id)
self.assertEqual(str(model.id), obj.id)
self.assertDictEqual(model.trigger, {})
self.assertDictEqual(model.trigger_type, {})
self.assertDictEqual(model.trigger_instance, {})
self.assertDictEqual(model.rule, {})
self.assertDictEqual(model.action, self.fake_history_subtasks[0]['action'])
self.assertDictEqual(model.runner, self.fake_history_subtasks[0]['runner'])
doc = copy.deepcopy(self.fake_history_subtasks[0]['execution'])
doc['start_timestamp'] = isotime.parse(doc['start_timestamp'])
self.assertDictEqual(model.execution, doc)
self.assertEqual(model.parent, self.fake_history_subtasks[0]['parent'])
self.assertListEqual(model.children, [])
# Update the DB record.
children = [str(bson.ObjectId()), str(bson.ObjectId())]
model.children = children
ActionExecutionHistory.add_or_update(model)
model = ActionExecutionHistory.get_by_id(obj.id)
self.assertListEqual(model.children, children)
# Delete the DB record.
ActionExecutionHistory.delete(model)
self.assertRaises(ValueError, ActionExecutionHistory.get_by_id, obj.id)
开发者ID:bjoernbessert,项目名称:st2,代码行数:28,代码来源:test_history.py
示例17: to_model
def to_model(cls, token):
model = super(cls, cls).to_model(token)
model.user = str(token.user) if token.user else None
model.token = str(token.token) if token.token else None
model.ttl = getattr(token, 'ttl', None)
model.expiry = isotime.parse(token.expiry) if token.expiry else None
return model
开发者ID:bjoernbessert,项目名称:st2,代码行数:7,代码来源:access.py
示例18: purge_trigger_instances
def purge_trigger_instances(logger, timestamp):
"""
:param timestamp: Trigger instances older than this timestamp will be deleted.
:type timestamp: ``datetime.datetime
"""
if not timestamp:
raise ValueError('Specify a valid timestamp to purge.')
logger.info('Purging trigger instances older than timestamp: %s' %
timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))
query_filters = {'occurrence_time__lt': isotime.parse(timestamp)}
try:
deleted_count = TriggerInstance.delete_by_query(**query_filters)
except InvalidQueryError as e:
msg = ('Bad query (%s) used to delete trigger instances: %s'
'Please contact support.' % (query_filters, str(e)))
raise InvalidQueryError(msg)
except:
logger.exception('Deleting instances using query_filters %s failed.', query_filters)
else:
logger.info('Deleted %s trigger instance objects' % (deleted_count))
# Print stats
logger.info('All trigger instance models older than timestamp %s were deleted.', timestamp)
开发者ID:lyandut,项目名称:st2,代码行数:26,代码来源:trigger_instances.py
示例19: to_model
def to_model(cls, instance):
trigger = instance.trigger
payload = instance.payload
occurrence_time = isotime.parse(instance.occurrence_time)
model = cls.model(trigger=trigger, payload=payload, occurrence_time=occurrence_time)
return model
开发者ID:KenMercusLai,项目名称:st2,代码行数:7,代码来源:trigger.py
示例20: test_get_all
def test_get_all(self):
self._get_actionexecution_id(self._do_post(LIVE_ACTION_1))
self._get_actionexecution_id(self._do_post(LIVE_ACTION_2))
resp = self.app.get('/v1/executions')
body = resp.json
self.assertEqual(resp.status_int, 200)
self.assertEqual(len(resp.json), 2,
'/v1/executions did not return all '
'actionexecutions.')
# Assert liveactions are sorted by timestamp.
for i in range(len(body) - 1):
self.assertTrue(isotime.parse(body[i]['start_timestamp']) >=
isotime.parse(body[i + 1]['start_timestamp']))
self.assertTrue('web_url' in body[i])
if 'end_timestamp' in body[i]:
self.assertTrue('elapsed_seconds' in body[i])
开发者ID:LindsayHill,项目名称:st2,代码行数:16,代码来源:test_executions_simple.py
注:本文中的st2common.util.isotime.parse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论