本文整理汇总了Python中superdesk.utc.get_expiry_date函数的典型用法代码示例。如果您正苦于以下问题:Python get_expiry_date函数的具体用法?Python get_expiry_date怎么用?Python get_expiry_date使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_expiry_date函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __set_published_item_expiry
def __set_published_item_expiry(self, doc):
desk_id = doc.get('task', {}).get('desk', None)
desk = {}
if desk_id:
desk = get_resource_service('desks').find_one(req=None, _id=desk_id)
expiry_minutes = desk.get('published_item_expiry', config.PUBLISHED_ITEMS_EXPIRY_MINUTES)
doc['expiry'] = get_expiry_date(expiry_minutes, offset=doc[EMBARGO]) if doc.get(EMBARGO) else \
get_expiry_date(expiry_minutes)
开发者ID:PressAssociation,项目名称:superdesk,代码行数:10,代码来源:published_item.py
示例2: test_query_getting_overdue_scheduled_content
def test_query_getting_overdue_scheduled_content(self):
self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(-10), 'state': 'published'}])
self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(-10), 'state': 'scheduled'}])
self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(0), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'publish_schedule': get_expiry_date(10), 'state': 'scheduled'}])
self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])
now = date_to_str(utcnow())
overdueItems = get_overdue_scheduled_items(now, 'archive')
self.assertEquals(1, overdueItems.count())
开发者ID:copyfun,项目名称:superdesk,代码行数:10,代码来源:archive_test.py
示例3: test_get_expiry_date
def test_get_expiry_date(self):
self.assertIsInstance(get_expiry_date(minutes=60), datetime)
date1 = utcnow() + timedelta(minutes=60)
date2 = get_expiry_date(minutes=60)
self.assertEqual(date1.year, date2.year)
self.assertEqual(date1.month, date2.month)
self.assertEqual(date1.day, date2.day)
self.assertEqual(date1.hour, date2.hour)
self.assertEqual(date1.minute, date2.minute)
self.assertEqual(date1.second, date2.second)
开发者ID:akintolga,项目名称:superdesk-core,代码行数:10,代码来源:utc_tests.py
示例4: setUp
def setUp(self):
super().setUp()
with self.app.app_context():
self.app.data.insert('archive', [{'expiry': get_expiry_date(-10)}])
self.app.data.insert('archive', [{'expiry': get_expiry_date(0)}])
self.app.data.insert('archive', [{'expiry': get_expiry_date(10)}])
self.app.data.insert('archive', [{'expiry': get_expiry_date(20)}])
self.app.data.insert('archive', [{'expiry': get_expiry_date(30)}])
self.app.data.insert('archive', [{'expiry': None}])
self.app.data.insert('archive', [{'unique_id': 97}])
init_app(self.app)
开发者ID:akintolga,项目名称:superdesk-server,代码行数:11,代码来源:archive_test.py
示例5: test_query_getting_overdue_scheduled_content
def test_query_getting_overdue_scheduled_content(self):
with self.app.app_context():
self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(-10), "state": "published"}])
self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(-10), "state": "scheduled"}])
self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(0), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"publish_schedule": get_expiry_date(10), "state": "scheduled"}])
self.app.data.insert(ARCHIVE, [{"unique_id": 97, "state": "spiked"}])
now = date_to_str(utcnow())
overdueItems = get_overdue_scheduled_items(now, "archive")
self.assertEquals(1, overdueItems.count())
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:11,代码来源:archive_test.py
示例6: test_query_getting_expired_content
def test_query_getting_expired_content(self):
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(-10), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])
now = utcnow()
expired_items = get_resource_service(ARCHIVE).get_expired_items(now)
self.assertEquals(1, expired_items.count())
开发者ID:hlmnrmr,项目名称:superdesk-core,代码行数:12,代码来源:archive_test.py
示例7: test_query_getting_expired_content
def test_query_getting_expired_content(self):
with self.app.app_context():
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(-10), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])
now = date_to_str(utcnow())
expiredItems = RemoveExpiredSpikeContent().get_expired_items(now)
self.assertEquals(2, expiredItems.count())
开发者ID:verifiedpixel,项目名称:verifiedpixel,代码行数:13,代码来源:archive_test.py
示例8: test_query_getting_expired_content
def test_query_getting_expired_content(self):
with self.app.app_context():
self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(-10), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(0), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(10), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(20), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"expiry": get_expiry_date(30), "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"expiry": None, "state": "spiked"}])
self.app.data.insert(ARCHIVE, [{"unique_id": 97, "state": "spiked"}])
now = date_to_str(utcnow())
expired_items = RemoveExpiredSpikeContent().get_expired_items(now)
self.assertEquals(2, expired_items.count())
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:13,代码来源:archive_test.py
示例9: on_update
def on_update(self, updates, original):
if updates.get('content_expiry') == 0:
updates['content_expiry'] = app.settings['CONTENT_EXPIRY_MINUTES']
super().on_update(updates, original)
if updates.get('content_expiry', None):
docs = self.get_stage_documents(str(original[config.ID_FIELD]))
for doc in docs:
expiry = get_expiry_date(updates['content_expiry'], doc['versioncreated'])
item_model = get_model(ItemModel)
item_model.update({'_id': doc[config.ID_FIELD]}, {'expiry': expiry})
if updates.get('working_stage', False):
if not original.get('working_stage'):
self.remove_old_default(original.get('desk'), 'working_stage')
self.set_desk_ref(original, 'working_stage')
else:
if original.get('working_stage') and 'working_stage' in updates:
raise SuperdeskApiError.forbiddenError(message='Must have one working stage in a desk')
if updates.get('default_incoming', False):
if not original.get('default_incoming'):
self.remove_old_default(original.get('desk'), 'default_incoming')
self.set_desk_ref(original, 'incoming_stage')
else:
if original.get('default_incoming') and 'default_incoming' in updates:
raise SuperdeskApiError.forbiddenError(message='Must have one incoming stage in a desk')
开发者ID:nistormihai,项目名称:superdesk,代码行数:28,代码来源:stages.py
示例10: update
def update(self, id, updates, original):
original_state = original[ITEM_STATE]
if not is_workflow_state_transition_valid('spike', original_state):
raise InvalidStateTransitionError()
package_service = PackageService()
user = get_user(required=True)
item = get_resource_service(ARCHIVE).find_one(req=None, _id=id)
expiry_minutes = app.settings['SPIKE_EXPIRY_MINUTES']
# check if item is in a desk. If it's then use the desks spike_expiry
if is_assigned_to_a_desk(item):
desk = get_resource_service('desks').find_one(_id=item['task']['desk'], req=None)
expiry_minutes = desk.get('spike_expiry', expiry_minutes)
updates[EXPIRY] = get_expiry_date(expiry_minutes)
updates[REVERT_STATE] = item.get(ITEM_STATE, None)
if original.get('rewrite_of'):
updates['rewrite_of'] = None
item = self.backend.update(self.datasource, id, updates, original)
push_notification('item:spike', item=str(item.get('_id')), user=str(user))
package_service.remove_spiked_refs_from_package(id)
return item
开发者ID:verifiedpixel,项目名称:verifiedpixel,代码行数:26,代码来源:archive_spike.py
示例11: update
def update(self, id, updates, original):
original_state = original[config.CONTENT_STATE]
if not is_workflow_state_transition_valid("spike", original_state):
raise InvalidStateTransitionError()
package_service = PackageService()
user = get_user(required=True)
item = get_resource_service(ARCHIVE).find_one(req=None, _id=id)
expiry_minutes = app.settings["SPIKE_EXPIRY_MINUTES"]
# check if item is in a desk. If it's then use the desks spike_expiry
if is_assigned_to_a_desk(item):
desk = get_resource_service("desks").find_one(_id=item["task"]["desk"], req=None)
expiry_minutes = desk.get("spike_expiry", expiry_minutes)
updates[EXPIRY] = get_expiry_date(expiry_minutes)
updates[REVERT_STATE] = item.get(app.config["CONTENT_STATE"], None)
if original.get("rewrite_of"):
updates["rewrite_of"] = None
item = self.backend.update(self.datasource, id, updates, original)
push_notification("item:spike", item=str(item.get("_id")), user=str(user))
package_service.remove_spiked_refs_from_package(id)
return item
开发者ID:oxcarh,项目名称:superdesk,代码行数:26,代码来源:archive_spike.py
示例12: ingest_item
def ingest_item(item, provider, rule_set=None, routing_scheme=None):
try:
item.setdefault(superdesk.config.ID_FIELD, generate_guid(type=GUID_NEWSML))
item[FAMILY_ID] = item[superdesk.config.ID_FIELD]
providers[provider.get('type')].provider = provider
item['ingest_provider'] = str(provider[superdesk.config.ID_FIELD])
item.setdefault('source', provider.get('source', ''))
set_default_state(item, STATE_INGESTED)
item['expiry'] = get_expiry_date(provider.get('content_expiry', app.config['INGEST_EXPIRY_MINUTES']),
item.get('versioncreated'))
if 'anpa_category' in item:
process_anpa_category(item, provider)
if 'subject' in item:
process_iptc_codes(item, provider)
if 'anpa_category' not in item:
derive_category(item, provider)
elif 'anpa_category' in item:
derive_subject(item)
apply_rule_set(item, provider, rule_set)
ingest_service = superdesk.get_resource_service('ingest')
if item.get('ingest_provider_sequence') is None:
ingest_service.set_ingest_provider_sequence(item, provider)
old_item = ingest_service.find_one(guid=item[GUID_FIELD], req=None)
rend = item.get('renditions', {})
if rend:
baseImageRend = rend.get('baseImage') or next(iter(rend.values()))
if baseImageRend:
href = providers[provider.get('type')].prepare_href(baseImageRend['href'])
update_renditions(item, href, old_item)
if old_item:
# In case we already have the item, preserve the _id
item[superdesk.config.ID_FIELD] = old_item[superdesk.config.ID_FIELD]
ingest_service.put_in_mongo(item[superdesk.config.ID_FIELD], item)
else:
try:
ingest_service.post_in_mongo([item])
except HTTPException as e:
logger.error("Exception while persisting item in ingest collection", e)
if routing_scheme:
routed = ingest_service.find_one(_id=item[superdesk.config.ID_FIELD], req=None)
superdesk.get_resource_service('routing_schemes').apply_routing_scheme(routed, provider, routing_scheme)
except Exception as ex:
logger.exception(ex)
try:
superdesk.app.sentry.captureException()
except:
pass
return False
return True
开发者ID:ancafarcas,项目名称:superdesk-core,代码行数:59,代码来源:update_ingest.py
示例13: test_query_getting_expired_content
def test_query_getting_expired_content(self):
now = utcnow()
self.app.data.insert(ARCHIVE, [{'expiry': now - timedelta(minutes=10), 'state': 'spiked',
'unique_id': 'expired'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(0), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(10), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(20), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': get_expiry_date(30), 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'expiry': None, 'state': 'spiked'}])
self.app.data.insert(ARCHIVE, [{'unique_id': 97, 'state': 'spiked'}])
expired_items = get_resource_service(ARCHIVE).get_expired_items(now)
now = utcnow()
for expired_items in get_resource_service(ARCHIVE).get_expired_items(now):
self.assertEquals(1, len(expired_items))
self.assertEquals('expired', expired_items[0]['unique_id'])
开发者ID:superdesk,项目名称:superdesk-core,代码行数:17,代码来源:archive_test.py
示例14: __set_published_item_expiry
def __set_published_item_expiry(self, doc):
desk_id = doc.get("task", {}).get("desk", None)
desk = {}
if desk_id:
desk = get_resource_service("desks").find_one(req=None, _id=desk_id)
expiry_minutes = desk.get("published_item_expiry", config.PUBLISHED_ITEMS_EXPIRY_MINUTES)
doc["expiry"] = get_expiry_date(expiry_minutes)
开发者ID:m038,项目名称:superdesk,代码行数:9,代码来源:published_item.py
示例15: ingest_item
def ingest_item(item, provider, rule_set=None, routing_scheme=None):
try:
item.setdefault(superdesk.config.ID_FIELD, generate_guid(type=GUID_NEWSML))
item[FAMILY_ID] = item[superdesk.config.ID_FIELD]
providers[provider.get("type")].provider = provider
item["ingest_provider"] = str(provider[superdesk.config.ID_FIELD])
item.setdefault("source", provider.get("source", ""))
set_default_state(item, STATE_INGESTED)
item["expiry"] = get_expiry_date(
provider.get("content_expiry", INGEST_EXPIRY_MINUTES), item.get("versioncreated")
)
if "anpa-category" in item:
process_anpa_category(item, provider)
if "subject" in item:
process_iptc_codes(item, provider)
apply_rule_set(item, provider, rule_set)
ingest_service = superdesk.get_resource_service("ingest")
if item.get("ingest_provider_sequence") is None:
ingest_service.set_ingest_provider_sequence(item, provider)
rend = item.get("renditions", {})
if rend:
baseImageRend = rend.get("baseImage") or next(iter(rend.values()))
if baseImageRend:
href = providers[provider.get("type")].prepare_href(baseImageRend["href"])
update_renditions(item, href)
old_item = ingest_service.find_one(guid=item[GUID_FIELD], req=None)
if old_item:
# In case we already have the item, preserve the _id
item[superdesk.config.ID_FIELD] = old_item[superdesk.config.ID_FIELD]
ingest_service.put_in_mongo(item[superdesk.config.ID_FIELD], item)
else:
try:
ingest_service.post_in_mongo([item])
except HTTPException as e:
logger.error("Exception while persisting item in ingest collection", e)
if routing_scheme:
routed = ingest_service.find_one(_id=item[superdesk.config.ID_FIELD], req=None)
superdesk.get_resource_service("routing_schemes").apply_routing_scheme(routed, provider, routing_scheme)
except Exception as ex:
logger.exception(ex)
try:
superdesk.app.sentry.captureException()
except:
pass
return False
return True
开发者ID:Flowdeeps,项目名称:superdesk-1,代码行数:56,代码来源:update_ingest.py
示例16: test_get_expiry_date_with_offset
def test_get_expiry_date_with_offset(self):
offset = utcnow() + timedelta(minutes=10)
date1 = offset + timedelta(minutes=5)
date2 = get_expiry_date(minutes=5, offset=offset)
self.assertEqual(date1.year, date2.year)
self.assertEqual(date1.month, date2.month)
self.assertEqual(date1.day, date2.day)
self.assertEqual(date1.hour, date2.hour)
self.assertEqual(date1.minute, date2.minute)
self.assertEqual(date1.second, date2.second)
开发者ID:akintolga,项目名称:superdesk-core,代码行数:10,代码来源:utc_tests.py
示例17: on_update
def on_update(self, updates, original):
if updates.get('content_expiry') == 0:
updates['content_expiry'] = app.settings['CONTENT_EXPIRY_MINUTES']
super().on_update(updates, original)
if updates.get('content_expiry', None):
docs = self.get_stage_documents(str(original['_id']))
for doc in docs:
expiry = get_expiry_date(updates['content_expiry'], doc['versioncreated'])
item_model = get_model(ItemModel)
item_model.update({'_id': doc['_id']}, {'expiry': expiry})
开发者ID:marwoodandrew,项目名称:superdesk-server,代码行数:10,代码来源:stages.py
示例18: _get_spike_expiry
def _get_spike_expiry(self, desk_id, stage_id):
"""
If there is a SPIKE_EXPIRY_MINUTES setting then that is used to set the spike expiry.
If a None value is configured then the desk/stage value is returned.
:param desk_id:
:param stage_id:
:return:
"""
# If no maximum spike expiry is set then return the desk/stage values
if app.settings['SPIKE_EXPIRY_MINUTES'] is None:
return get_expiry(desk_id=desk_id, stage_id=stage_id)
else:
return get_expiry_date(app.settings['SPIKE_EXPIRY_MINUTES'])
开发者ID:liveblog,项目名称:superdesk-core,代码行数:13,代码来源:archive_spike.py
示例19: test_query_removing_media_files_keeps
def test_query_removing_media_files_keeps(self):
self.app.data.insert(ARCHIVE, [{'state': 'spiked',
'expiry': get_expiry_date(-10),
'type': 'picture',
'renditions': self.media}])
self.app.data.insert('ingest', [{'type': 'picture', 'renditions': self.media}])
self.app.data.insert('archive_versions', [{'type': 'picture', 'renditions': self.media}])
self.app.data.insert('legal_archive', [{'_id': 1, 'type': 'picture', 'renditions': self.media}])
self.app.data.insert('legal_archive_versions', [{'_id': 1, 'type': 'picture', 'renditions': self.media}])
archive_items = self.app.data.find_all('archive', None)
self.assertEqual(archive_items.count(), 1)
deleted = remove_media_files(archive_items[0])
self.assertFalse(deleted)
开发者ID:hlmnrmr,项目名称:superdesk-core,代码行数:15,代码来源:archive_test.py
示例20: test_query_removing_media_files_keeps
def test_query_removing_media_files_keeps(self):
with self.app.app_context():
self.app.data.insert(
ARCHIVE,
[{"state": "spiked", "expiry": get_expiry_date(-10), "type": "picture", "renditions": self.media}],
)
self.app.data.insert("ingest", [{"type": "picture", "renditions": self.media}])
self.app.data.insert("archive_versions", [{"type": "picture", "renditions": self.media}])
self.app.data.insert("legal_archive", [{"_id": 1, "type": "picture", "renditions": self.media}])
self.app.data.insert("legal_archive_versions", [{"_id": 1, "type": "picture", "renditions": self.media}])
archive_items = self.app.data.find_all("archive", None)
self.assertEqual(archive_items.count(), 1)
deleted = remove_media_files(archive_items[0])
self.assertFalse(deleted)
开发者ID:verifiedpixel,项目名称:superdesk,代码行数:16,代码来源:archive_test.py
注:本文中的superdesk.utc.get_expiry_date函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论