• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utc.get_expiry_date函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中superdesk.utc.get_expiry_date函数的典型用法代码示例。如果您正苦于以下问题:Python get_expiry_date函数的具体用法?Python get_expiry_date怎么用?Python get_expiry_date使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_expiry_date函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __set_published_item_expiry

    def __set_published_item_expiry(self, doc):
        desk_id = doc.get('task', {}).get('desk', None)
        desk = {}

        if desk_id:
            desk = get_resource_service('desks').find_one(req=None, _id=desk_id)

        expiry_minutes = desk.get('published_item_expiry', config.PUBLISHED_ITEMS_EXPIRY_MINUTES)
        doc['expiry'] = get_expiry_date(expiry_minutes, offset=doc[EMBARGO]) if doc.get(EMBARGO) else \
            get_expiry_date(expiry_minutes)
开发者ID:PressAssociation,项目名称:superdesk,代码行数:10,代码来源:published_item.py


示例2: test_query_getting_overdue_scheduled_content

    def test_query_getting_overdue_scheduled_content(self):
        self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(-10), 'state': 'published'}])
        self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(-10), 'state': 'scheduled'}])
        self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(0), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(10), 'state': 'scheduled'}])
        self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])

        now = date_to_str(utcnow())
        overdueItems = get_overdue_scheduled_items(now, 'archive')
        self.assertEquals(1, overdueItems.count())
开发者ID:copyfun,项目名称:superdesk,代码行数:10,代码来源:archive_test.py


示例3: test_get_expiry_date

 def test_get_expiry_date(self):
     self.assertIsInstance(get_expiry_date(minutes=60), datetime)
     date1 = utcnow() + timedelta(minutes=60)
     date2 = get_expiry_date(minutes=60)
     self.assertEqual(date1.year, date2.year)
     self.assertEqual(date1.month, date2.month)
     self.assertEqual(date1.day, date2.day)
     self.assertEqual(date1.hour, date2.hour)
     self.assertEqual(date1.minute, date2.minute)
     self.assertEqual(date1.second, date2.second)
开发者ID:akintolga,项目名称:superdesk-core,代码行数:10,代码来源:utc_tests.py


示例4: setUp

 def setUp(self):
     super().setUp()
     with self.app.app_context():
         self.app.data.insert('archive', [{'expiry': get_expiry_date(-10)}])
         self.app.data.insert('archive', [{'expiry': get_expiry_date(0)}])
         self.app.data.insert('archive', [{'expiry': get_expiry_date(10)}])
         self.app.data.insert('archive', [{'expiry': get_expiry_date(20)}])
         self.app.data.insert('archive', [{'expiry': get_expiry_date(30)}])
         self.app.data.insert('archive', [{'expiry': None}])
         self.app.data.insert('archive', [{'unique_id': 97}])
         init_app(self.app)
开发者ID:akintolga,项目名称:superdesk-server,代码行数:11,代码来源:archive_test.py


示例5: test_query_getting_overdue_scheduled_content

    def test_query_getting_overdue_scheduled_content(self):
        with self.app.app_context():
            self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(-10), "state": "published"}])
            self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(-10), "state": "scheduled"}])
            self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(0), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(10), "state": "scheduled"}])
            self.app.data.insert(ARCHIVE, [{"unique_id": 97, "state": "spiked"}])

            now = date_to_str(utcnow())
            overdueItems = get_overdue_scheduled_items(now, "archive")
            self.assertEquals(1, overdueItems.count())
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:11,代码来源:archive_test.py


示例6: test_query_getting_expired_content

    def test_query_getting_expired_content(self):
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(-10), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])

        now = utcnow()
        expired_items = get_resource_service(ARCHIVE).get_expired_items(now)
        self.assertEquals(1, expired_items.count())
开发者ID:hlmnrmr,项目名称:superdesk-core,代码行数:12,代码来源:archive_test.py


示例7: test_query_getting_expired_content

    def test_query_getting_expired_content(self):
        with self.app.app_context():
            self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(-10), 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
            self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])

            now = date_to_str(utcnow())
            expiredItems = RemoveExpiredSpikeContent().get_expired_items(now)
            self.assertEquals(2, expiredItems.count())
开发者ID:verifiedpixel,项目名称:verifiedpixel,代码行数:13,代码来源:archive_test.py


示例8: test_query_getting_expired_content

    def test_query_getting_expired_content(self):
        with self.app.app_context():
            self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(-10), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(0), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(10), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(20), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(30), "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"expiry": None, "state": "spiked"}])
            self.app.data.insert(ARCHIVE, [{"unique_id": 97, "state": "spiked"}])

            now = date_to_str(utcnow())
            expired_items = RemoveExpiredSpikeContent().get_expired_items(now)
            self.assertEquals(2, expired_items.count())
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:13,代码来源:archive_test.py


示例9: on_update

    def on_update(self, updates, original):
        if updates.get('content_expiry') == 0:
            updates['content_expiry'] = app.settings['CONTENT_EXPIRY_MINUTES']

        super().on_update(updates, original)

        if updates.get('content_expiry', None):
            docs = self.get_stage_documents(str(original[config.ID_FIELD]))
            for doc in docs:
                expiry = get_expiry_date(updates['content_expiry'], doc['versioncreated'])
                item_model = get_model(ItemModel)
                item_model.update({'_id': doc[config.ID_FIELD]}, {'expiry': expiry})

        if updates.get('working_stage', False):
            if not original.get('working_stage'):
                self.remove_old_default(original.get('desk'), 'working_stage')
                self.set_desk_ref(original, 'working_stage')
        else:
            if original.get('working_stage') and 'working_stage' in updates:
                raise SuperdeskApiError.forbiddenError(message='Must have one working stage in a desk')

        if updates.get('default_incoming', False):
            if not original.get('default_incoming'):
                self.remove_old_default(original.get('desk'), 'default_incoming')
                self.set_desk_ref(original, 'incoming_stage')
        else:
            if original.get('default_incoming') and 'default_incoming' in updates:
                raise SuperdeskApiError.forbiddenError(message='Must have one incoming stage in a desk')
开发者ID:nistormihai,项目名称:superdesk,代码行数:28,代码来源:stages.py


示例10: update

    def update(self, id, updates, original):
        original_state = original[ITEM_STATE]
        if not is_workflow_state_transition_valid('spike', original_state):
            raise InvalidStateTransitionError()

        package_service = PackageService()
        user = get_user(required=True)

        item = get_resource_service(ARCHIVE).find_one(req=None, _id=id)
        expiry_minutes = app.settings['SPIKE_EXPIRY_MINUTES']

        # check if item is in a desk. If it's then use the desks spike_expiry
        if is_assigned_to_a_desk(item):
            desk = get_resource_service('desks').find_one(_id=item['task']['desk'], req=None)
            expiry_minutes = desk.get('spike_expiry', expiry_minutes)

        updates[EXPIRY] = get_expiry_date(expiry_minutes)
        updates[REVERT_STATE] = item.get(ITEM_STATE, None)

        if original.get('rewrite_of'):
            updates['rewrite_of'] = None

        item = self.backend.update(self.datasource, id, updates, original)
        push_notification('item:spike', item=str(item.get('_id')), user=str(user))
        package_service.remove_spiked_refs_from_package(id)
        return item
开发者ID:verifiedpixel,项目名称:verifiedpixel,代码行数:26,代码来源:archive_spike.py


示例11: update

    def update(self, id, updates, original):
        original_state = original[config.CONTENT_STATE]
        if not is_workflow_state_transition_valid("spike", original_state):
            raise InvalidStateTransitionError()

        package_service = PackageService()
        user = get_user(required=True)

        item = get_resource_service(ARCHIVE).find_one(req=None, _id=id)
        expiry_minutes = app.settings["SPIKE_EXPIRY_MINUTES"]

        # check if item is in a desk. If it's then use the desks spike_expiry
        if is_assigned_to_a_desk(item):
            desk = get_resource_service("desks").find_one(_id=item["task"]["desk"], req=None)
            expiry_minutes = desk.get("spike_expiry", expiry_minutes)

        updates[EXPIRY] = get_expiry_date(expiry_minutes)
        updates[REVERT_STATE] = item.get(app.config["CONTENT_STATE"], None)

        if original.get("rewrite_of"):
            updates["rewrite_of"] = None

        item = self.backend.update(self.datasource, id, updates, original)
        push_notification("item:spike", item=str(item.get("_id")), user=str(user))
        package_service.remove_spiked_refs_from_package(id)
        return item
开发者ID:oxcarh,项目名称:superdesk,代码行数:26,代码来源:archive_spike.py


示例12: ingest_item

def ingest_item(item, provider, rule_set=None, routing_scheme=None):
    try:
        item.setdefault(superdesk.config.ID_FIELD, generate_guid(type=GUID_NEWSML))
        item[FAMILY_ID] = item[superdesk.config.ID_FIELD]
        providers[provider.get('type')].provider = provider

        item['ingest_provider'] = str(provider[superdesk.config.ID_FIELD])
        item.setdefault('source', provider.get('source', ''))
        set_default_state(item, STATE_INGESTED)
        item['expiry'] = get_expiry_date(provider.get('content_expiry', app.config['INGEST_EXPIRY_MINUTES']),
                                         item.get('versioncreated'))

        if 'anpa_category' in item:
            process_anpa_category(item, provider)

        if 'subject' in item:
            process_iptc_codes(item, provider)
            if 'anpa_category' not in item:
                derive_category(item, provider)
        elif 'anpa_category' in item:
            derive_subject(item)

        apply_rule_set(item, provider, rule_set)

        ingest_service = superdesk.get_resource_service('ingest')

        if item.get('ingest_provider_sequence') is None:
            ingest_service.set_ingest_provider_sequence(item, provider)

        old_item = ingest_service.find_one(guid=item[GUID_FIELD], req=None)

        rend = item.get('renditions', {})
        if rend:
            baseImageRend = rend.get('baseImage') or next(iter(rend.values()))
            if baseImageRend:
                href = providers[provider.get('type')].prepare_href(baseImageRend['href'])
                update_renditions(item, href, old_item)

        if old_item:
            # In case we already have the item, preserve the _id
            item[superdesk.config.ID_FIELD] = old_item[superdesk.config.ID_FIELD]
            ingest_service.put_in_mongo(item[superdesk.config.ID_FIELD], item)
        else:
            try:
                ingest_service.post_in_mongo([item])
            except HTTPException as e:
                logger.error("Exception while persisting item in ingest collection", e)

        if routing_scheme:
            routed = ingest_service.find_one(_id=item[superdesk.config.ID_FIELD], req=None)
            superdesk.get_resource_service('routing_schemes').apply_routing_scheme(routed, provider, routing_scheme)
    except Exception as ex:
        logger.exception(ex)
        try:
            superdesk.app.sentry.captureException()
        except:
            pass
        return False
    return True
开发者ID:ancafarcas,项目名称:superdesk-core,代码行数:59,代码来源:update_ingest.py


示例13: test_query_getting_expired_content

    def test_query_getting_expired_content(self):
        now = utcnow()

        self.app.data.insert(ARCHIVE, [{'expiry': now - timedelta(minutes=10), 'state': 'spiked',
                                        'unique_id': 'expired'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
        self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])

        expired_items = get_resource_service(ARCHIVE).get_expired_items(now)
        now = utcnow()
        for expired_items in get_resource_service(ARCHIVE).get_expired_items(now):
            self.assertEquals(1, len(expired_items))
            self.assertEquals('expired', expired_items[0]['unique_id'])
开发者ID:superdesk,项目名称:superdesk-core,代码行数:17,代码来源:archive_test.py


示例14: __set_published_item_expiry

    def __set_published_item_expiry(self, doc):
        desk_id = doc.get("task", {}).get("desk", None)
        desk = {}

        if desk_id:
            desk = get_resource_service("desks").find_one(req=None, _id=desk_id)

        expiry_minutes = desk.get("published_item_expiry", config.PUBLISHED_ITEMS_EXPIRY_MINUTES)
        doc["expiry"] = get_expiry_date(expiry_minutes)
开发者ID:m038,项目名称:superdesk,代码行数:9,代码来源:published_item.py


示例15: ingest_item

def ingest_item(item, provider, rule_set=None, routing_scheme=None):
    try:
        item.setdefault(superdesk.config.ID_FIELD, generate_guid(type=GUID_NEWSML))
        item[FAMILY_ID] = item[superdesk.config.ID_FIELD]
        providers[provider.get("type")].provider = provider

        item["ingest_provider"] = str(provider[superdesk.config.ID_FIELD])
        item.setdefault("source", provider.get("source", ""))
        set_default_state(item, STATE_INGESTED)
        item["expiry"] = get_expiry_date(
            provider.get("content_expiry", INGEST_EXPIRY_MINUTES), item.get("versioncreated")
        )

        if "anpa-category" in item:
            process_anpa_category(item, provider)

        if "subject" in item:
            process_iptc_codes(item, provider)

        apply_rule_set(item, provider, rule_set)

        ingest_service = superdesk.get_resource_service("ingest")

        if item.get("ingest_provider_sequence") is None:
            ingest_service.set_ingest_provider_sequence(item, provider)

        rend = item.get("renditions", {})
        if rend:
            baseImageRend = rend.get("baseImage") or next(iter(rend.values()))
            if baseImageRend:
                href = providers[provider.get("type")].prepare_href(baseImageRend["href"])
                update_renditions(item, href)

        old_item = ingest_service.find_one(guid=item[GUID_FIELD], req=None)

        if old_item:
            # In case we already have the item, preserve the _id
            item[superdesk.config.ID_FIELD] = old_item[superdesk.config.ID_FIELD]
            ingest_service.put_in_mongo(item[superdesk.config.ID_FIELD], item)
        else:
            try:
                ingest_service.post_in_mongo([item])
            except HTTPException as e:
                logger.error("Exception while persisting item in ingest collection", e)

        if routing_scheme:
            routed = ingest_service.find_one(_id=item[superdesk.config.ID_FIELD], req=None)
            superdesk.get_resource_service("routing_schemes").apply_routing_scheme(routed, provider, routing_scheme)
    except Exception as ex:
        logger.exception(ex)
        try:
            superdesk.app.sentry.captureException()
        except:
            pass
        return False
    return True
开发者ID:Flowdeeps,项目名称:superdesk-1,代码行数:56,代码来源:update_ingest.py


示例16: test_get_expiry_date_with_offset

 def test_get_expiry_date_with_offset(self):
     offset = utcnow() + timedelta(minutes=10)
     date1 = offset + timedelta(minutes=5)
     date2 = get_expiry_date(minutes=5, offset=offset)
     self.assertEqual(date1.year, date2.year)
     self.assertEqual(date1.month, date2.month)
     self.assertEqual(date1.day, date2.day)
     self.assertEqual(date1.hour, date2.hour)
     self.assertEqual(date1.minute, date2.minute)
     self.assertEqual(date1.second, date2.second)
开发者ID:akintolga,项目名称:superdesk-core,代码行数:10,代码来源:utc_tests.py


示例17: on_update

 def on_update(self, updates, original):
     if updates.get('content_expiry') == 0:
         updates['content_expiry'] = app.settings['CONTENT_EXPIRY_MINUTES']
     super().on_update(updates, original)
     if updates.get('content_expiry', None):
         docs = self.get_stage_documents(str(original['_id']))
         for doc in docs:
             expiry = get_expiry_date(updates['content_expiry'], doc['versioncreated'])
             item_model = get_model(ItemModel)
             item_model.update({'_id': doc['_id']}, {'expiry': expiry})
开发者ID:marwoodandrew,项目名称:superdesk-server,代码行数:10,代码来源:stages.py


示例18: _get_spike_expiry

 def _get_spike_expiry(self, desk_id, stage_id):
     """
     If there is a SPIKE_EXPIRY_MINUTES setting then that is used to set the spike expiry.
     If a None value is configured then the desk/stage value is returned.
     :param desk_id:
     :param stage_id:
     :return:
     """
     # If no maximum spike expiry is set then return the desk/stage values
     if app.settings['SPIKE_EXPIRY_MINUTES'] is None:
         return get_expiry(desk_id=desk_id, stage_id=stage_id)
     else:
         return get_expiry_date(app.settings['SPIKE_EXPIRY_MINUTES'])
开发者ID:liveblog,项目名称:superdesk-core,代码行数:13,代码来源:archive_spike.py


示例19: test_query_removing_media_files_keeps

    def test_query_removing_media_files_keeps(self):
        self.app.data.insert(ARCHIVE, [{'state': 'spiked',
                                        'expiry': get_expiry_date(-10),
                                        'type': 'picture',
                                        'renditions': self.media}])

        self.app.data.insert('ingest', [{'type': 'picture', 'renditions': self.media}])
        self.app.data.insert('archive_versions', [{'type': 'picture', 'renditions': self.media}])
        self.app.data.insert('legal_archive', [{'_id': 1, 'type': 'picture', 'renditions': self.media}])
        self.app.data.insert('legal_archive_versions', [{'_id': 1, 'type': 'picture', 'renditions': self.media}])

        archive_items = self.app.data.find_all('archive', None)
        self.assertEqual(archive_items.count(), 1)
        deleted = remove_media_files(archive_items[0])
        self.assertFalse(deleted)
开发者ID:hlmnrmr,项目名称:superdesk-core,代码行数:15,代码来源:archive_test.py


示例20: test_query_removing_media_files_keeps

    def test_query_removing_media_files_keeps(self):
        with self.app.app_context():
            self.app.data.insert(
                ARCHIVE,
                [{"state": "spiked", "expiry": get_expiry_date(-10), "type": "picture", "renditions": self.media}],
            )

            self.app.data.insert("ingest", [{"type": "picture", "renditions": self.media}])
            self.app.data.insert("archive_versions", [{"type": "picture", "renditions": self.media}])
            self.app.data.insert("legal_archive", [{"_id": 1, "type": "picture", "renditions": self.media}])
            self.app.data.insert("legal_archive_versions", [{"_id": 1, "type": "picture", "renditions": self.media}])

            archive_items = self.app.data.find_all("archive", None)
            self.assertEqual(archive_items.count(), 1)
            deleted = remove_media_files(archive_items[0])
            self.assertFalse(deleted)
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:16,代码来源:archive_test.py



注:本文中的superdesk.utc.get_expiry_date函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utc.utcnow函数代码示例发布时间:2022-05-27
下一篇:
Python tests.setup函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap