• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python isotime.add_utc_tz函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中st2common.util.isotime.add_utc_tz函数的典型用法代码示例。如果您正苦于以下问题:Python add_utc_tz函数的具体用法?Python add_utc_tz怎么用?Python add_utc_tz使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了add_utc_tz函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_token_post_set_ttl

 def test_token_post_set_ttl(self):
     timestamp = isotime.add_utc_tz(datetime.datetime.utcnow())
     response = self.app.post_json('/tokens', {'ttl': 60}, expect_errors=False)
     expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=60)
     expected_expiry = isotime.add_utc_tz(expected_expiry)
     self.assertEqual(response.status_int, 201)
     actual_expiry = isotime.parse(response.json['expiry'])
     self.assertLess(timestamp, actual_expiry)
     self.assertLess(actual_expiry, expected_expiry)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:9,代码来源:test_token.py


示例2: _test_token_post

 def _test_token_post(self):
     ttl = cfg.CONF.auth.token_ttl
     timestamp = isotime.add_utc_tz(datetime.datetime.utcnow())
     response = self.app.post_json('/tokens', {}, expect_errors=False)
     expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl)
     expected_expiry = isotime.add_utc_tz(expected_expiry)
     self.assertEqual(response.status_int, 201)
     self.assertIsNotNone(response.json['token'])
     self.assertEqual(response.json['user'], USERNAME)
     actual_expiry = isotime.parse(response.json['expiry'])
     self.assertLess(timestamp, actual_expiry)
     self.assertLess(actual_expiry, expected_expiry)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:12,代码来源:test_token.py


示例3: test_token_successfully_obtained

 def test_token_successfully_obtained(self):
     time_now = isotime.add_utc_tz(datetime.datetime.now())
     registrar = InternalTriggerTypesRegistrar()
     self.assertTrue(registrar._auth_creds is not None)
     # TTL is at least 10 mins
     self.assertTrue((registrar._auth_creds.expiry - time_now).seconds > 10 * 60)
     delete_token(registrar._auth_creds.token)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:7,代码来源:test_internal_trigger_types_registrar.py


示例4: validate_token

def validate_token(token_in_headers, token_in_query_params):
    """
    Validate the provided authentication token.

    :param token_in_headers: Authentication token provided via headers.
    :type token_in_headers: ``str``

    :param token_in_query_params: Authentication token provided via query params.
    :type token_in_query_params: ``str``

    :return: TokenDB object on success.
    :rtype: :class:`.TokenDB`
    """
    if not token_in_headers and not token_in_query_params:
        LOG.audit('Token is not found in header or query parameters.')
        raise exceptions.TokenNotProvidedError('Token is not provided.')

    if token_in_headers:
        LOG.audit('Token provided in headers')

    if token_in_query_params:
        LOG.audit('Token provided in query parameters')

    token_string = token_in_headers or token_in_query_params
    token = Token.get(token_string)

    if token.expiry <= isotime.add_utc_tz(datetime.datetime.utcnow()):
        # TODO: purge expired tokens
        LOG.audit('Token with id "%s" has expired.' % (token.id))
        raise exceptions.TokenExpiredError('Token has expired.')

    LOG.audit('Token with id "%s" is validated.' % (token.id))
    return token
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:33,代码来源:auth.py


示例5: test_create_token_ttl_ok

 def test_create_token_ttl_ok(self):
     ttl = 10
     token = access.create_token(USERNAME, 10)
     self.assertTrue(token is not None)
     self.assertTrue(token.token is not None)
     self.assertEqual(token.user, USERNAME)
     expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl)
     expected_expiry = isotime.add_utc_tz(expected_expiry)
     self.assertLess(isotime.parse(token.expiry), expected_expiry)
开发者ID:rgaertner,项目名称:st2,代码行数:9,代码来源:test_access.py


示例6: test_create_token_ttl_capped

 def test_create_token_ttl_capped(self):
     ttl = cfg.CONF.auth.token_ttl + 10
     expected_expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl)
     expected_expiry = isotime.add_utc_tz(expected_expiry)
     token = access.create_token('manas', 10)
     self.assertTrue(token is not None)
     self.assertTrue(token.token is not None)
     self.assertEqual(token.user, 'manas')
     self.assertLess(isotime.parse(token.expiry), expected_expiry)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:9,代码来源:test_access.py


示例7: _validate_token

 def _validate_token(self, env):
     """Validate token"""
     if 'HTTP_X_AUTH_TOKEN' not in env:
         LOG.audit('Token is not found in header.')
         raise exceptions.TokenNotProvidedError('Token is not provided.')
     token = Token.get(env['HTTP_X_AUTH_TOKEN'])
     if token.expiry <= isotime.add_utc_tz(datetime.datetime.utcnow()):
         LOG.audit('Token "%s" has expired.' % env['HTTP_X_AUTH_TOKEN'])
         raise exceptions.TokenExpiredError('Token has expired.')
     LOG.audit('Token "%s" is validated.' % env['HTTP_X_AUTH_TOKEN'])
     return token
开发者ID:bjoernbessert,项目名称:st2,代码行数:11,代码来源:auth.py


示例8: _update_live_action_db

    def _update_live_action_db(self, liveaction_id, status, result, context):
        liveaction_db = get_liveaction_by_id(liveaction_id)
        if status in DONE_STATES:
            end_timestamp = isotime.add_utc_tz(datetime.datetime.utcnow())
        else:
            end_timestamp = None

        liveaction_db = update_liveaction_status(status=status,
                                                 result=result,
                                                 context=context,
                                                 end_timestamp=end_timestamp,
                                                 liveaction_db=liveaction_db)
        return liveaction_db
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:13,代码来源:base.py


示例9: test_format

 def test_format(self):
     dt = isotime.add_utc_tz(datetime.datetime(2000, 1, 1, 12))
     dt_str_usec_offset = '2000-01-01T12:00:00.000000+00:00'
     dt_str_usec = '2000-01-01T12:00:00.000000Z'
     dt_str_offset = '2000-01-01T12:00:00+00:00'
     dt_str = '2000-01-01T12:00:00Z'
     dt_unicode = u'2000-01-01T12:00:00Z'
     self.assertEqual(isotime.format(dt, usec=True, offset=True), dt_str_usec_offset)
     self.assertEqual(isotime.format(dt, usec=True, offset=False), dt_str_usec)
     self.assertEqual(isotime.format(dt, usec=False, offset=True), dt_str_offset)
     self.assertEqual(isotime.format(dt, usec=False, offset=False), dt_str)
     self.assertEqual(isotime.format(dt_str, usec=False, offset=False), dt_str)
     self.assertEqual(isotime.format(dt_unicode, usec=False, offset=False), dt_unicode)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:13,代码来源:test_isotime.py


示例10: test_parse

 def test_parse(self):
     dt = isotime.add_utc_tz(datetime.datetime(2000, 1, 1, 12))
     self.assertEqual(isotime.parse('2000-01-01 12:00:00Z'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00+00'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00+0000'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00+00:00'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00.000000Z'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00.000000+00'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00.000000+0000'), dt)
     self.assertEqual(isotime.parse('2000-01-01 12:00:00.000000+00:00'), dt)
     self.assertEqual(isotime.parse('2000-01-01T12:00:00Z'), dt)
     self.assertEqual(isotime.parse('2000-01-01T12:00:00+00:00'), dt)
     self.assertEqual(isotime.parse('2000-01-01T12:00:00.000000Z'), dt)
     self.assertEqual(isotime.parse('2000-01-01T12:00:00.000000+00:00'), dt)
     self.assertEqual(isotime.parse('2000-01-01T12:00:00.000Z'), dt)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:15,代码来源:test_isotime.py


示例11: delete

    def delete(self, exec_id):
        """
        Stops a single execution.

        Handles requests:
            DELETE /actionexecutions/<id>

        """
        execution_api = self._get_one(id=exec_id)

        if not execution_api:
            abort(http_client.NOT_FOUND, 'Execution with id %s not found.' % exec_id)
            return

        liveaction_id = execution_api.liveaction['id']
        if not liveaction_id:
            abort(http_client.INTERNAL_SERVER_ERROR,
                  'Execution object missing link to liveaction %s.' % liveaction_id)

        try:
            liveaction_db = LiveAction.get_by_id(liveaction_id)
        except:
            abort(http_client.INTERNAL_SERVER_ERROR,
                  'Execution object missing link to liveaction %s.' % liveaction_id)
            return

        if liveaction_db.status == LIVEACTION_STATUS_CANCELED:
            abort(http_client.OK,
                  'Action is already in "canceled" state.')

        if liveaction_db.status not in CANCELABLE_STATES:
            abort(http_client.OK,
                  'Action cannot be canceled. State = %s.' % liveaction_db.status)
            return

        liveaction_db.status = 'canceled'
        liveaction_db.end_timestamp = isotime.add_utc_tz(datetime.datetime.utcnow())
        liveaction_db.result = {'message': 'Action canceled by user.'}
        try:
            LiveAction.add_or_update(liveaction_db)
        except:
            LOG.exception('Failed updating status to canceled for liveaction %s.',
                          liveaction_db.id)
            abort(http_client.INTERNAL_SERVER_ERROR, 'Failed canceling execution.')
            return

        execution_db = execution_service.update_execution(liveaction_db)
        return ActionExecutionAPI.from_model(execution_db)
开发者ID:SamMarkowitz,项目名称:st2,代码行数:48,代码来源:actionexecutions.py


示例12: test_datetime_range

    def test_datetime_range(self):
        base = isotime.add_utc_tz(datetime.datetime(2014, 12, 25, 0, 0, 0))
        for i in range(60):
            timestamp = base + datetime.timedelta(seconds=i)
            obj = FakeModelDB(name=uuid.uuid4().hex, timestamp=timestamp)
            self.access.add_or_update(obj)

        dt_range = '2014-12-25T00:00:10Z..2014-12-25T00:00:19Z'
        objs = self.access.query(timestamp=dt_range)
        self.assertEqual(len(objs), 10)
        self.assertLess(objs[0].timestamp, objs[9].timestamp)

        dt_range = '2014-12-25T00:00:19Z..2014-12-25T00:00:10Z'
        objs = self.access.query(timestamp=dt_range)
        self.assertEqual(len(objs), 10)
        self.assertLess(objs[9].timestamp, objs[0].timestamp)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:16,代码来源:test_persistence.py


示例13: test_delayed_executions_recovery_before_timeout

    def test_delayed_executions_recovery_before_timeout(self):
        # Create a live action that's delayed but has not passed the timeout.
        liveaction = LiveActionDB(action='wolfpack.action-1',
                                  parameters={'actionstr': 'foo'},
                                  start_timestamp=isotime.add_utc_tz(datetime.datetime.utcnow()),
                                  status=action_constants.LIVEACTION_STATUS_DELAYED)

        liveaction = LiveAction.add_or_update(liveaction, publish=False)
        executions.create_execution_object(liveaction, publish=False)

        # Run the rescheduling routine.
        scheduler.recover_delayed_executions()

        # The live action is expected to stay "delayed".
        liveaction = LiveAction.get_by_id(str(liveaction.id))
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_DELAYED)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:16,代码来源:test_scheduler.py


示例14: create_token

def create_token(username, ttl=None, metadata=None):
    """
    :param username: Username of the user to create the token for. If the account for this user
                     doesn't exist yet it will be created.
    :type username: ``str``

    :param ttl: Token TTL (in seconds).
    :type ttl: ``int``

    :param metadata: Optional metadata to associate with the token.
    :type metadata: ``dict``
    """

    if ttl:
        if ttl > cfg.CONF.auth.token_ttl:
            msg = 'TTL specified %s is greater than max allowed %s.' % (
                ttl, cfg.CONF.auth.token_ttl
            )
            raise TTLTooLargeException(msg)
    else:
        ttl = cfg.CONF.auth.token_ttl

    if username:
        try:
            User.get_by_name(username)
        except:
            user = UserDB(name=username)
            User.add_or_update(user)

            extra = {'username': username, 'user': user}
            LOG.audit('Registered new user "%s".' % (username), extra=extra)

    token = uuid.uuid4().hex
    expiry = datetime.datetime.utcnow() + datetime.timedelta(seconds=ttl)
    expiry = isotime.add_utc_tz(expiry)
    token = TokenDB(user=username, token=token, expiry=expiry, metadata=metadata)
    Token.add_or_update(token)

    username_string = username if username else 'an anonymous user'
    token_expire_string = isotime.format(expiry, offset=False)
    extra = {'username': username, 'token_expiration': token_expire_string}

    LOG.audit('Access granted to "%s" with the token set to expire at "%s".' %
              (username_string, token_expire_string), extra=extra)

    return token
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:46,代码来源:access.py


示例15: test_sort_multiple

    def test_sort_multiple(self):
        count = 60
        base = isotime.add_utc_tz(datetime.datetime(2014, 12, 25, 0, 0, 0))
        for i in range(count):
            category = 'type1' if i % 2 else 'type2'
            timestamp = base + datetime.timedelta(seconds=i)
            obj = FakeModelDB(name=uuid.uuid4().hex, timestamp=timestamp, category=category)
            self.access.add_or_update(obj)

        objs = self.access.query(order_by=['category', 'timestamp'])
        self.assertEqual(len(objs), count)
        for i in range(count):
            category = 'type1' if i < count / 2 else 'type2'
            self.assertEqual(objs[i].category, category)
        self.assertLess(objs[0].timestamp, objs[(count / 2) - 1].timestamp)
        self.assertLess(objs[count / 2].timestamp, objs[(count / 2) - 1].timestamp)
        self.assertLess(objs[count / 2].timestamp, objs[count - 1].timestamp)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:17,代码来源:test_persistence.py


示例16: test_token_model

 def test_token_model(self):
     dt = isotime.add_utc_tz(datetime.datetime.utcnow())
     tk1 = TokenAPI(user='stanley', token=uuid.uuid4().hex,
                    expiry=isotime.format(dt, offset=False))
     tkdb1 = TokenAPI.to_model(tk1)
     self.assertIsNotNone(tkdb1)
     self.assertIsInstance(tkdb1, TokenDB)
     self.assertEqual(tkdb1.user, tk1.user)
     self.assertEqual(tkdb1.token, tk1.token)
     self.assertEqual(tkdb1.expiry, isotime.parse(tk1.expiry))
     tkdb2 = Token.add_or_update(tkdb1)
     self.assertEqual(tkdb1, tkdb2)
     self.assertIsNotNone(tkdb2.id)
     tk2 = TokenAPI.from_model(tkdb2)
     self.assertEqual(tk2.user, tk1.user)
     self.assertEqual(tk2.token, tk1.token)
     self.assertEqual(tk2.expiry, tk1.expiry)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:17,代码来源:test_token.py


示例17: test_datetime_range

    def test_datetime_range(self):
        base = isotime.add_utc_tz(datetime.datetime(2014, 12, 25, 0, 0, 0))
        for i in range(60):
            timestamp = base + datetime.timedelta(seconds=i)
            doc = copy.deepcopy(self.fake_history_subtasks[0])
            doc['id'] = str(bson.ObjectId())
            doc['start_timestamp'] = isotime.format(timestamp)
            obj = ActionExecutionAPI(**doc)
            ActionExecution.add_or_update(ActionExecutionAPI.to_model(obj))

        dt_range = '2014-12-25T00:00:10Z..2014-12-25T00:00:19Z'
        objs = ActionExecution.query(start_timestamp=dt_range)
        self.assertEqual(len(objs), 10)

        dt_range = '2014-12-25T00:00:19Z..2014-12-25T00:00:10Z'
        objs = ActionExecution.query(start_timestamp=dt_range)
        self.assertEqual(len(objs), 10)
开发者ID:BlazeMediaGroup,项目名称:st2,代码行数:17,代码来源:test_executions.py


示例18: setUpClass

    def setUpClass(cls):
        super(TestActionExecutionHistory, cls).setUpClass()

        cls.dt_base = isotime.add_utc_tz(datetime.datetime(2014, 12, 25, 0, 0, 0))
        cls.num_records = 100
        cls.refs = {}

        cls.fake_types = [
            {
                'trigger': copy.deepcopy(fixture.ARTIFACTS['trigger']),
                'trigger_type': copy.deepcopy(fixture.ARTIFACTS['trigger_type']),
                'trigger_instance': copy.deepcopy(fixture.ARTIFACTS['trigger_instance']),
                'rule': copy.deepcopy(fixture.ARTIFACTS['rule']),
                'action': copy.deepcopy(fixture.ARTIFACTS['actions']['chain']),
                'runner': copy.deepcopy(fixture.ARTIFACTS['runners']['action-chain']),
                'execution': copy.deepcopy(fixture.ARTIFACTS['executions']['workflow']),
                'children': []
            },
            {
                'action': copy.deepcopy(fixture.ARTIFACTS['actions']['local']),
                'runner': copy.deepcopy(fixture.ARTIFACTS['runners']['run-local']),
                'execution': copy.deepcopy(fixture.ARTIFACTS['executions']['task1'])
            }
        ]

        def assign_parent(child):
            candidates = [v for k, v in cls.refs.iteritems() if v.action['name'] == 'chain']
            if candidates:
                parent = random.choice(candidates)
                child['parent'] = str(parent.id)
                parent.children.append(child['id'])
                cls.refs[str(parent.id)] = ActionExecutionHistory.add_or_update(parent)

        for i in range(cls.num_records):
            obj_id = str(bson.ObjectId())
            timestamp = cls.dt_base + datetime.timedelta(seconds=i)
            fake_type = random.choice(cls.fake_types)
            data = copy.deepcopy(fake_type)
            data['id'] = obj_id
            data['execution']['start_timestamp'] = isotime.format(timestamp, offset=False)
            if fake_type['action']['name'] == 'local' and random.choice([True, False]):
                assign_parent(data)
            wb_obj = ActionExecutionHistoryAPI(**data)
            db_obj = ActionExecutionHistoryAPI.to_model(wb_obj)
            cls.refs[obj_id] = ActionExecutionHistory.add_or_update(db_obj)
开发者ID:bjoernbessert,项目名称:st2,代码行数:45,代码来源:test_history.py


示例19: test_sort_by_start_timestamp

    def test_sort_by_start_timestamp(self):
        base = isotime.add_utc_tz(datetime.datetime(2014, 12, 25, 0, 0, 0))
        for i in range(60):
            timestamp = base + datetime.timedelta(seconds=i)
            doc = copy.deepcopy(self.fake_history_subtasks[0])
            doc['id'] = str(bson.ObjectId())
            doc['execution']['start_timestamp'] = isotime.format(timestamp)
            obj = ActionExecutionHistoryAPI(**doc)
            ActionExecutionHistory.add_or_update(ActionExecutionHistoryAPI.to_model(obj))

        dt_range = '2014-12-25T00:00:10Z..2014-12-25T00:00:19Z'
        objs = ActionExecutionHistory.query(execution__start_timestamp=dt_range,
                                            order_by=['execution__start_timestamp'])
        self.assertLess(objs[0].execution['start_timestamp'], objs[9].execution['start_timestamp'])

        dt_range = '2014-12-25T00:00:19Z..2014-12-25T00:00:10Z'
        objs = ActionExecutionHistory.query(execution__start_timestamp=dt_range,
                                            order_by=['-execution__start_timestamp'])
        self.assertLess(objs[9].execution['start_timestamp'], objs[0].execution['start_timestamp'])
开发者ID:bjoernbessert,项目名称:st2,代码行数:19,代码来源:test_history.py


示例20: test_delayed_executions_recovery

    def test_delayed_executions_recovery(self):
        # Create a live action that's already delayed pass the allowed timeout.
        dt_now = isotime.add_utc_tz(datetime.datetime.utcnow())
        dt_delta = datetime.timedelta(seconds=cfg.CONF.scheduler.delayed_execution_recovery)
        dt_timeout = dt_now - dt_delta

        liveaction = LiveActionDB(action='wolfpack.action-1',
                                  parameters={'actionstr': 'foo'},
                                  start_timestamp=dt_timeout,
                                  status=action_constants.LIVEACTION_STATUS_DELAYED)

        liveaction = LiveAction.add_or_update(liveaction, publish=False)
        executions.create_execution_object(liveaction, publish=False)

        # Run the rescheduling routine.
        scheduler.recover_delayed_executions()

        # The live action is expected to complete.
        liveaction = LiveAction.get_by_id(str(liveaction.id))
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
开发者ID:Kailashkatheth1,项目名称:st2,代码行数:20,代码来源:test_scheduler.py



注:本文中的st2common.util.isotime.add_utc_tz函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python isotime.format函数代码示例发布时间:2022-05-27
下一篇:
Python shell.run_command函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap