• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python isotime.parse函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2common.util.isotime.parse函数的典型用法代码示例。如果您正苦于以下问题:Python parse函数的具体用法?Python parse怎么用?Python parse使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_descending_sort

 def test_descending_sort(self):
     response = self.app.get('/v1/executions?sort_desc=True')
     self.assertEqual(response.status_int, 200)
     self.assertIsInstance(response.json, list)
     dt1 = response.json[0]['start_timestamp']
     dt2 = response.json[len(response.json) - 1]['start_timestamp']
     self.assertLess(isotime.parse(dt2), isotime.parse(dt1))
开发者ID:StackStorm,项目名称:st2,代码行数:7,代码来源:test_executions_filters.py


示例2: test_default_sort

 def test_default_sort(self):
     response = self.app.get('/history/executions')
     self.assertEqual(response.status_int, 200)
     self.assertIsInstance(response.json, list)
     dt1 = response.json[0]['execution']['start_timestamp']
     dt2 = response.json[len(response.json) - 1]['execution']['start_timestamp']
     self.assertLess(isotime.parse(dt2), isotime.parse(dt1))
开发者ID:bjoernbessert,项目名称:st2,代码行数:7,代码来源:test_history.py


示例3: purge_executions

def purge_executions(timestamp=None, action_ref=None, purge_incomplete=False):
    if not timestamp:
        LOG.error("Specify a valid timestamp to purge.")
        return

    LOG.info("Purging executions older than timestamp: %s" % timestamp.strftime("%Y-%m-%dT%H:%M:%S.%fZ"))

    filters = {}
    if action_ref:
        filters["action__ref"] = action_ref

    if purge_incomplete:
        filters["start_timestamp__lt"] = isotime.parse(timestamp)
    else:
        filters["end_timestamp__lt"] = isotime.parse(timestamp)
        filters["start_timestamp__lt"] = isotime.parse(timestamp)
        filters["status"] = {"$in": DONE_STATES}

    # XXX: Think about paginating this call.
    executions = ActionExecution.query(**filters)
    LOG.info("#### Total number of executions to delete: %d" % len(executions))

    # Purge execution and liveaction models now
    for execution_db in executions:
        _purge_models(execution_db)

    # Print stats
    LOG.info("#### Total execution models deleted: %d" % DELETED_COUNT)
开发者ID:beryah,项目名称:st2,代码行数:28,代码来源:purge_executions.py


示例4: _process_datetime_range_filters

    def _process_datetime_range_filters(self, filters, order_by=None):
        ranges = {k: v for k, v in filters.iteritems() if type(v) in [str, unicode] and ".." in v}

        order_by_list = copy.deepcopy(order_by) if order_by else []
        for k, v in ranges.iteritems():
            values = v.split("..")
            dt1 = isotime.parse(values[0])
            dt2 = isotime.parse(values[1])

            k__gte = "%s__gte" % k
            k__lte = "%s__lte" % k
            if dt1 < dt2:
                query = {k__gte: dt1, k__lte: dt2}
                sort_key, reverse_sort_key = k, "-" + k
            else:
                query = {k__gte: dt2, k__lte: dt1}
                sort_key, reverse_sort_key = "-" + k, k
            del filters[k]
            filters.update(query)

            if reverse_sort_key in order_by_list:
                idx = order_by_list.index(reverse_sort_key)
                order_by_list.pop(idx)
                order_by_list.insert(idx, sort_key)
            elif sort_key not in order_by_list:
                order_by_list = [sort_key] + order_by_list

        return filters, order_by_list
开发者ID:ipv1337,项目名称:st2,代码行数:28,代码来源:__init__.py


示例5: _process_datetime_range_filters

    def _process_datetime_range_filters(self, filters, order_by=None):
        ranges = {k: v for k, v in six.iteritems(filters)
                  if type(v) in [str, six.text_type] and '..' in v}

        order_by_list = copy.deepcopy(order_by) if order_by else []
        for k, v in six.iteritems(ranges):
            values = v.split('..')
            dt1 = isotime.parse(values[0])
            dt2 = isotime.parse(values[1])

            k__gte = '%s__gte' % k
            k__lte = '%s__lte' % k
            if dt1 < dt2:
                query = {k__gte: dt1, k__lte: dt2}
                sort_key, reverse_sort_key = k, '-' + k
            else:
                query = {k__gte: dt2, k__lte: dt1}
                sort_key, reverse_sort_key = '-' + k, k
            del filters[k]
            filters.update(query)

            if reverse_sort_key in order_by_list:
                idx = order_by_list.index(reverse_sort_key)
                order_by_list.pop(idx)
                order_by_list.insert(idx, sort_key)
            elif sort_key not in order_by_list:
                order_by_list = [sort_key] + order_by_list

        return filters, order_by_list
开发者ID:nzlosh,项目名称:st2,代码行数:29,代码来源:__init__.py


示例6: to_model

    def to_model(cls, live_action):
        name = getattr(live_action, 'name', None)
        description = getattr(live_action, 'description', None)
        action = live_action.action

        if getattr(live_action, 'start_timestamp', None):
            start_timestamp = isotime.parse(live_action.start_timestamp)
        else:
            start_timestamp = None

        if getattr(live_action, 'end_timestamp', None):
            end_timestamp = isotime.parse(live_action.end_timestamp)
        else:
            end_timestamp = None

        status = getattr(live_action, 'status', None)
        parameters = getattr(live_action, 'parameters', dict())
        context = getattr(live_action, 'context', dict())
        callback = getattr(live_action, 'callback', dict())
        result = getattr(live_action, 'result', None)

        if getattr(live_action, 'notify', None):
            notify = NotificationsHelper.to_model(live_action.notify)
        else:
            notify = None

        model = cls.model(name=name, description=description, action=action,
                          start_timestamp=start_timestamp, end_timestamp=end_timestamp,
                          status=status, parameters=parameters, context=context,
                          callback=callback, result=result, notify=notify)

        return model
开发者ID:hejin,项目名称:st2,代码行数:32,代码来源:action.py


示例7: _get_updated_action_exec_result

    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
            created_at = isotime.parse(prev_task_result['created_at'])
            updated_at = liveaction.end_timestamp
        else:
            created_at = isotime.parse(prev_task_result['created_at'])
            updated_at = isotime.parse(prev_task_result['updated_at'])

        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
开发者ID:nzlosh,项目名称:st2,代码行数:9,代码来源:action_chain_runner.py


示例8: test_get_all

 def test_get_all(self):
     self._get_actionexecution_id(self._do_post(LIVE_ACTION_1))
     self._get_actionexecution_id(self._do_post(LIVE_ACTION_2))
     resp = self.app.get("/v1/executions")
     body = resp.json
     self.assertEqual(resp.status_int, 200)
     self.assertEqual(len(resp.json), 2, "/v1/executions did not return all " "actionexecutions.")
     # Assert liveactions are sorted by timestamp.
     for i in range(len(body) - 1):
         self.assertTrue(isotime.parse(body[i]["start_timestamp"]) >= isotime.parse(body[i + 1]["start_timestamp"]))
开发者ID:enykeev,项目名称:st2,代码行数:10,代码来源:test_executions_simple.py


示例9: to_model

 def to_model(cls, instance):
     model = cls.model()
     for attr, meta in six.iteritems(cls.schema.get('properties', dict())):
         default = copy.deepcopy(meta.get('default', None))
         value = getattr(instance, attr, default)
         if not value and not cls.model._fields[attr].required:
             continue
         if attr not in ActionExecutionAPI.SKIP:
             setattr(model, attr, value)
     model.start_timestamp = isotime.parse(instance.start_timestamp)
     model.end_timestamp = isotime.parse(instance.end_timestamp)
     return model
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:12,代码来源:execution.py


示例10: test_write_marker_to_db

 def test_write_marker_to_db(self):
     executions_queue = self.get_queue()
     dumper = Dumper(queue=executions_queue,
                     export_dir='/tmp', batch_size=5,
                     max_files_per_sleep=1,
                     file_prefix='st2-stuff-', file_format='json')
     timestamps = [isotime.parse(execution.end_timestamp) for execution in self.execution_apis]
     max_timestamp = max(timestamps)
     marker_db = dumper._write_marker_to_db(max_timestamp)
     persisted_marker = marker_db.marker
     self.assertTrue(isinstance(persisted_marker, six.string_types))
     self.assertEqual(isotime.parse(persisted_marker), max_timestamp)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:12,代码来源:test_dumper_integration.py


示例11: test_get_all

 def test_get_all(self):
     self._get_actionexecution_id(self._do_post(ACTION_EXECUTION_1))
     self._get_actionexecution_id(self._do_post(ACTION_EXECUTION_2))
     resp = self.app.get('/v1/actionexecutions')
     body = resp.json
     # Assert executions are sorted by timestamp.
     for i in range(len(body) - 1):
         self.assertTrue(isotime.parse(body[i]['start_timestamp']) >=
                         isotime.parse(body[i + 1]['start_timestamp']))
     self.assertEqual(resp.status_int, 200)
     self.assertEqual(len(resp.json), 2,
                      '/v1/actionexecutions did not return all '
                      'actionexecutions.')
开发者ID:nagyist,项目名称:StackStorm-st2,代码行数:13,代码来源:test_actionexecutions.py


示例12: _purge_executions

def _purge_executions(timestamp=None, action_ref=None):
    if not timestamp:
        print('Specify a valid timestamp to purge.')
        return

    if not action_ref:
        action_ref = ''

    print('Purging executions older than timestamp: %s' %
          timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))

    def should_delete(execution_db):
        if action_ref != '':
            return (execution_db.liveaction['action'] == action_ref
                    and execution_db.start_timestamp < timestamp)
        else:
            return execution_db.start_timestamp < timestamp

    # XXX: Think about paginating this call.
    filters = {'start_timestamp__lt': isotime.parse(timestamp)}
    executions = ActionExecution.query(**filters)
    executions_to_delete = filter(should_delete, executions)
    print('#### Total number of executions to delete: %d' % len(executions_to_delete))

    # Purge execution and liveaction models now
    for execution_db in executions_to_delete:
        _purge_action_models(execution_db)

    # Print stats
    print('#### Total execution models deleted: %d' % DELETED_COUNT)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:30,代码来源:purge_executions.py


示例13: to_model

    def to_model(cls, instance):
        user = str(instance.user) if instance.user else None
        token = str(instance.token) if instance.token else None
        expiry = isotime.parse(instance.expiry) if instance.expiry else None

        model = cls.model(user=user, token=token, expiry=expiry)
        return model
开发者ID:nzlosh,项目名称:st2,代码行数:7,代码来源:auth.py


示例14: purge_trigger_instances

def purge_trigger_instances(logger, timestamp):
    """
    :param timestamp: Trigger instances older than this timestamp will be deleted.
    :type timestamp: ``datetime.datetime
    """
    if not timestamp:
        raise ValueError('Specify a valid timestamp to purge.')

    logger.info('Purging trigger instances older than timestamp: %s' %
                timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))

    query_filters = {'occurrence_time__lt': isotime.parse(timestamp)}

    # TODO: Update this code to return statistics on deleted objects once we
    # upgrade to newer version of MongoDB where delete_by_query actually returns
    # some data

    try:
        TriggerInstance.delete_by_query(**query_filters)
    except InvalidQueryError as e:
        msg = ('Bad query (%s) used to delete trigger instances: %s'
               'Please contact support.' % (query_filters, str(e)))
        raise InvalidQueryError(msg)
    except:
        logger.exception('Deleting instances using query_filters %s failed.', query_filters)

    # Print stats
    logger.info('All trigger instance models older than timestamp %s were deleted.', timestamp)
开发者ID:AlexeyDeyneko,项目名称:st2,代码行数:28,代码来源:trigger_instances.py


示例15: to_model

    def to_model(cls, instance):
        model = cls.model()
        for attr, meta in six.iteritems(cls.schema.get('properties', dict())):
            default = copy.deepcopy(meta.get('default', None))
            value = getattr(instance, attr, default)

            # pylint: disable=no-member
            # TODO: Add plugin which lets pylint know each MongoEngine document has _fields
            # attribute
            if not value and not cls.model._fields[attr].required:
                continue
            if attr not in ActionExecutionAPI.SKIP:
                setattr(model, attr, value)
        model.start_timestamp = isotime.parse(instance.start_timestamp)
        model.end_timestamp = isotime.parse(instance.end_timestamp)
        return model
开发者ID:saydulk,项目名称:st2,代码行数:16,代码来源:execution.py


示例16: test_crud_partial

    def test_crud_partial(self):
        # Create the DB record.
        obj = ActionExecutionHistoryAPI(**copy.deepcopy(self.fake_history_subtasks[0]))
        ActionExecutionHistory.add_or_update(ActionExecutionHistoryAPI.to_model(obj))
        model = ActionExecutionHistory.get_by_id(obj.id)
        self.assertEqual(str(model.id), obj.id)
        self.assertDictEqual(model.trigger, {})
        self.assertDictEqual(model.trigger_type, {})
        self.assertDictEqual(model.trigger_instance, {})
        self.assertDictEqual(model.rule, {})
        self.assertDictEqual(model.action, self.fake_history_subtasks[0]['action'])
        self.assertDictEqual(model.runner, self.fake_history_subtasks[0]['runner'])
        doc = copy.deepcopy(self.fake_history_subtasks[0]['execution'])
        doc['start_timestamp'] = isotime.parse(doc['start_timestamp'])
        self.assertDictEqual(model.execution, doc)
        self.assertEqual(model.parent, self.fake_history_subtasks[0]['parent'])
        self.assertListEqual(model.children, [])

        # Update the DB record.
        children = [str(bson.ObjectId()), str(bson.ObjectId())]
        model.children = children
        ActionExecutionHistory.add_or_update(model)
        model = ActionExecutionHistory.get_by_id(obj.id)
        self.assertListEqual(model.children, children)

        # Delete the DB record.
        ActionExecutionHistory.delete(model)
        self.assertRaises(ValueError, ActionExecutionHistory.get_by_id, obj.id)
开发者ID:bjoernbessert,项目名称:st2,代码行数:28,代码来源:test_history.py


示例17: to_model

 def to_model(cls, token):
     model = super(cls, cls).to_model(token)
     model.user = str(token.user) if token.user else None
     model.token = str(token.token) if token.token else None
     model.ttl = getattr(token, 'ttl', None)
     model.expiry = isotime.parse(token.expiry) if token.expiry else None
     return model
开发者ID:bjoernbessert,项目名称:st2,代码行数:7,代码来源:access.py


示例18: purge_trigger_instances

def purge_trigger_instances(logger, timestamp):
    """
    :param timestamp: Trigger instances older than this timestamp will be deleted.
    :type timestamp: ``datetime.datetime
    """
    if not timestamp:
        raise ValueError('Specify a valid timestamp to purge.')

    logger.info('Purging trigger instances older than timestamp: %s' %
                timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))

    query_filters = {'occurrence_time__lt': isotime.parse(timestamp)}

    try:
        deleted_count = TriggerInstance.delete_by_query(**query_filters)
    except InvalidQueryError as e:
        msg = ('Bad query (%s) used to delete trigger instances: %s'
               'Please contact support.' % (query_filters, str(e)))
        raise InvalidQueryError(msg)
    except:
        logger.exception('Deleting instances using query_filters %s failed.', query_filters)
    else:
        logger.info('Deleted %s trigger instance objects' % (deleted_count))

    # Print stats
    logger.info('All trigger instance models older than timestamp %s were deleted.', timestamp)
开发者ID:lyandut,项目名称:st2,代码行数:26,代码来源:trigger_instances.py


示例19: to_model

    def to_model(cls, instance):
        trigger = instance.trigger
        payload = instance.payload
        occurrence_time = isotime.parse(instance.occurrence_time)

        model = cls.model(trigger=trigger, payload=payload, occurrence_time=occurrence_time)
        return model
开发者ID:KenMercusLai,项目名称:st2,代码行数:7,代码来源:trigger.py


示例20: test_get_all

 def test_get_all(self):
     self._get_actionexecution_id(self._do_post(LIVE_ACTION_1))
     self._get_actionexecution_id(self._do_post(LIVE_ACTION_2))
     resp = self.app.get('/v1/executions')
     body = resp.json
     self.assertEqual(resp.status_int, 200)
     self.assertEqual(len(resp.json), 2,
                      '/v1/executions did not return all '
                      'actionexecutions.')
     # Assert liveactions are sorted by timestamp.
     for i in range(len(body) - 1):
         self.assertTrue(isotime.parse(body[i]['start_timestamp']) >=
                         isotime.parse(body[i + 1]['start_timestamp']))
         self.assertTrue('web_url' in body[i])
         if 'end_timestamp' in body[i]:
             self.assertTrue('elapsed_seconds' in body[i])
开发者ID:LindsayHill,项目名称:st2,代码行数:16,代码来源:test_executions_simple.py



注:本文中的st2common.util.isotime.parse函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python jinja.get_jinja_environment函数代码示例发布时间:2022-05-27
下一篇:
Python isotime.format函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap