• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python listeners.archive_callback函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中website.archiver.listeners.archive_callback函数的典型用法代码示例。如果您正苦于以下问题:Python archive_callback函数的具体用法?Python archive_callback怎么用?Python archive_callback使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了archive_callback函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_archive_callback_done_errors

 def test_archive_callback_done_errors(self):
     self.dst.archive_job.update_target('dropbox', ARCHIVER_SUCCESS)
     self.dst.archive_job.update_target('osfstorage', ARCHIVER_FAILURE)
     self.dst.archive_job.save()
     with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
         listeners.archive_callback(self.dst)
     assert(mock_fail.called_with(ARCHIVER_NETWORK_ERROR, self.src, self.dst, self.user, self.dst.archive_job.target_addons))
开发者ID:bdyetton,项目名称:osf.io,代码行数:7,代码来源:test_archiver.py


示例2: test_archiving_nodes_not_added_to_search_on_archive_incomplete

 def test_archiving_nodes_not_added_to_search_on_archive_incomplete(self, mock_send, mock_update_search):
     proj = factories.ProjectFactory()
     reg = factories.RegistrationFactory(project=proj)
     reg.save()
     with mock.patch('website.archiver.model.ArchiveJob.archive_tree_finished', mock.Mock(return_value=False)):
         listeners.archive_callback(reg)
     assert_false(mock_update_search.called)
开发者ID:atelic,项目名称:osf.io,代码行数:7,代码来源:test_archiver.py


示例3: test_archive_callback_updates_archiving_state_when_done

 def test_archive_callback_updates_archiving_state_when_done(self):
     proj = factories.NodeFactory()
     factories.NodeFactory(parent=proj)
     reg = factories.RegistrationFactory(project=proj)
     reg.archive_job.update_target('osfstorage', ARCHIVER_INITIATED)
     child = reg.nodes[0]
     child.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
     child.save()
     listeners.archive_callback(child)
     assert_false(child.archiving)
开发者ID:bdyetton,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py


示例4: test_archiving_nodes_not_added_to_search_on_archive_failure

 def test_archiving_nodes_not_added_to_search_on_archive_failure(self, mock_send, mock_delete_index_node):
     proj = factories.ProjectFactory()
     reg = factories.RegistrationFactory(project=proj)
     reg.save()
     with nested(
             mock.patch('osf.models.archive.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
             mock.patch('osf.models.archive.ArchiveJob.success', mock.PropertyMock(return_value=False))
     ) as (mock_finished, mock_success):
         listeners.archive_callback(reg)
     assert_true(mock_delete_index_node.called)
开发者ID:adlius,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py


示例5: test_archiving_nodes_added_to_search_on_archive_success_if_public

 def test_archiving_nodes_added_to_search_on_archive_success_if_public(self, mock_update_search, mock_send, mock_archive_success):
     proj = factories.ProjectFactory()
     reg = factories.RegistrationFactory(project=proj)
     reg.save()
     with nested(
         mock.patch('osf.models.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
         mock.patch('osf.models.ArchiveJob.success', mock.PropertyMock(return_value=True))
     ) as (mock_finished, mock_success):
         listeners.archive_callback(reg)
     assert_equal(mock_update_search.call_count, 1)
开发者ID:adlius,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py


示例6: mock_archive

def mock_archive(project, schema=None, auth=None, data=None, parent=None,
                 autocomplete=True, autoapprove=False):
    """ A context manager for registrations. When you want to call Node#register_node in
    a test but do not want to deal with any of this side effects of archiver, this
    helper allows for creating a registration in a safe fashion.

    :param bool autocomplete: automatically finish archival?
    :param bool autoapprove: automatically approve registration approval?

    Example use:

    project = ProjectFactory()
    with mock_archive(project) as registration:
        assert_true(registration.is_registration)
        assert_true(registration.archiving)
        assert_true(registration.is_pending_registration)

    with mock_archive(project, autocomplete=True) as registration:
        assert_true(registration.is_registration)
        assert_false(registration.archiving)
        assert_true(registration.is_pending_registration)

    with mock_archive(project, autocomplete=True, autoapprove=True) as registration:
        assert_true(registration.is_registration)
        assert_false(registration.archiving)
        assert_false(registration.is_pending_registration)
    """
    schema = schema or DEFAULT_METASCHEMA
    auth = auth or Auth(project.creator)
    data = data or ''

    with mock.patch('framework.tasks.handlers.enqueue_task'):
        registration = project.register_node(
            schema=schema,
            auth=auth,
            data=data,
            parent=parent,
        )
    registration.root.require_approval(project.creator)
    if autocomplete:
        root_job = registration.root.archive_job
        root_job.status = ARCHIVER_SUCCESS
        root_job.sent = False
        root_job.done = True
        root_job.save()
        sanction = registration.root.sanction
        with contextlib.nested(
            mock.patch.object(root_job, 'archive_tree_finished', mock.Mock(return_value=True)),
            mock.patch.object(sanction, 'ask')
        ):
            archiver_listeners.archive_callback(registration)
    if autoapprove:
        sanction = registration.root.sanction
        sanction._on_complete(project.creator)
    yield registration
开发者ID:Alpani,项目名称:osf.io,代码行数:55,代码来源:utils.py


示例7: test_archiving_nodes_not_added_to_search_on_archive_failure

 def test_archiving_nodes_not_added_to_search_on_archive_failure(self, mock_send, mock_update_search):
     proj = factories.ProjectFactory()
     reg = factories.RegistrationFactory(project=proj)
     reg.save()
     with nested(
             mock.patch('website.archiver.model.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
             mock.patch('website.archiver.model.ArchiveJob.sent', mock.PropertyMock(return_value=False)),
             mock.patch('website.archiver.model.ArchiveJob.success', mock.PropertyMock(return_value=False))
     ) as (mock_finished, mock_sent, mock_success):
         listeners.archive_callback(reg)
     mock_update_search.assert_not_called()
开发者ID:Alpani,项目名称:osf.io,代码行数:11,代码来源:test_archiver.py


示例8: test_archive_callback_done_embargoed

 def test_archive_callback_done_embargoed(self, mock_send, mock_archive_success):
     end_date = timezone.now() + datetime.timedelta(days=30)
     self.dst.archive_job.meta = {
         'embargo_urls': {
             contrib._id: None
             for contrib in self.dst.contributors
         }
     }
     self.dst.embargo_registration(self.user, end_date)
     self.dst.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
     self.dst.save()
     listeners.archive_callback(self.dst)
     assert_equal(mock_send.call_count, 1)
开发者ID:adlius,项目名称:osf.io,代码行数:13,代码来源:test_archiver.py


示例9: test_archive_callback_done_embargoed

 def test_archive_callback_done_embargoed(self, mock_send):
     end_date = datetime.datetime.now() + datetime.timedelta(days=30)
     self.dst.archive_job.meta = {
         'embargo_urls': {
             contrib._id: None
             for contrib in self.dst.contributors
         }
     }
     self.dst.embargo_registration(self.user, end_date)
     for addon in ['osfstorage', 'dropbox']:
         self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     self.dst.save()
     listeners.archive_callback(self.dst)
     mock_send.assert_called_with(self.dst, self.user, urls=None)
开发者ID:bdyetton,项目名称:osf.io,代码行数:14,代码来源:test_archiver.py


示例10: test_archive_callback_pending

 def test_archive_callback_pending(self):
     for addon in ['osfstorage', 'dropbox']:
         self.archive_job.update_target(
             addon,
             ARCHIVER_INITIATED
         )
     self.dst.archive_job.update_target(
         'osfstorage',
         ARCHIVER_SUCCESS
     )
     self.dst.archive_job.save()
     with mock.patch('website.archiver.utils.send_archiver_success_mail') as mock_send:
         with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
             listeners.archive_callback(self.dst)
     assert_false(mock_send.called)
     assert_false(mock_fail.called)
开发者ID:bdyetton,项目名称:osf.io,代码行数:16,代码来源:test_archiver.py


示例11: test_archive_callback_pending

 def test_archive_callback_pending(self, mock_delay):
     self.archive_job.update_target(
         'osfstorage',
         ARCHIVER_INITIATED
     )
     self.dst.archive_job.update_target(
         'osfstorage',
         ARCHIVER_SUCCESS
     )
     self.dst.archive_job.save()
     with mock.patch('website.mails.send_mail') as mock_send:
         with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
             listeners.archive_callback(self.dst)
     assert_false(mock_send.called)
     assert_false(mock_fail.called)
     assert_true(mock_delay.called)
开发者ID:adlius,项目名称:osf.io,代码行数:16,代码来源:test_archiver.py


示例12: test_archive_callback_on_tree_sends_only_one_email

 def test_archive_callback_on_tree_sends_only_one_email(self, mock_send_success):
     proj = factories.NodeFactory()
     child = factories.NodeFactory(parent=proj)
     factories.NodeFactory(parent=child)
     reg = factories.RegistrationFactory(project=proj)
     rchild = reg.nodes[0]
     rchild2 = rchild.nodes[0]
     for node in [reg, rchild, rchild2]:
         for addon in ['dropbox', 'osfstorage']:
             node.archive_job._set_target(addon)
     for node in [reg, rchild, rchild2]:
         for addon in ['dropbox', 'osfstorage']:
             node.archive_job.update_target(addon, ARCHIVER_INITIATED)
     for addon in ['dropbox', 'osfstorage']:
         rchild.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     rchild.save()
     listeners.archive_callback(rchild)
     mock_send_success.assert_not_called()
     for addon in ['dropbox', 'osfstorage']:
         reg.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     reg.save()
     listeners.archive_callback(reg)
     mock_send_success.assert_not_called()
     for addon in ['dropbox', 'osfstorage']:
         rchild2.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     rchild2.save()
     listeners.archive_callback(rchild2)
     mock_send_success.assert_called_with(reg)
开发者ID:bdyetton,项目名称:osf.io,代码行数:28,代码来源:test_archiver.py


示例13: test_archive_callback_on_tree_sends_only_one_email

 def test_archive_callback_on_tree_sends_only_one_email(self, mock_send_success, mock_arhive_success):
     proj = factories.NodeFactory()
     child = factories.NodeFactory(parent=proj)
     factories.NodeFactory(parent=child)
     reg = factories.RegistrationFactory(project=proj)
     rchild = reg._nodes.first()
     rchild2 = rchild._nodes.first()
     for node in [reg, rchild, rchild2]:
         node.archive_job._set_target('osfstorage')
     for node in [reg, rchild, rchild2]:
         node.archive_job.update_target('osfstorage', ARCHIVER_INITIATED)
     rchild.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
     rchild.save()
     listeners.archive_callback(rchild)
     assert_false(mock_send_success.called)
     reg.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
     reg.save()
     listeners.archive_callback(reg)
     assert_false(mock_send_success.called)
     rchild2.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
     rchild2.save()
     listeners.archive_callback(rchild2)
     assert_equal(mock_send_success.call_count, 1)
     assert_true(mock_send_success.called)
开发者ID:adlius,项目名称:osf.io,代码行数:24,代码来源:test_archiver.py


示例14: test_archive_callback_done_success

 def test_archive_callback_done_success(self, mock_send):
     for addon in ['osfstorage', 'dropbox']:
         self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     self.dst.archive_job.save()
     listeners.archive_callback(self.dst)
     mock_send.assert_called_with(self.dst)
开发者ID:bdyetton,项目名称:osf.io,代码行数:6,代码来源:test_archiver.py


示例15: mock_archive

def mock_archive(project, schema=None, auth=None, data=None, parent=None,
                 embargo=False, embargo_end_date=None,
                 retraction=False, justification=None, autoapprove_retraction=False,
                 autocomplete=True, autoapprove=False):
    """ A context manager for registrations. When you want to call Node#register_node in
    a test but do not want to deal with any of this side effects of archiver, this
    helper allows for creating a registration in a safe fashion.

    :param bool embargo: embargo the registration (rather than RegistrationApproval)
    :param bool autocomplete: automatically finish archival?
    :param bool autoapprove: automatically approve registration approval?
    :param bool retraction: retract the registration?
    :param str justification: a justification for the retraction
    :param bool autoapprove_retraction: automatically approve retraction?

    Example use:

    project = ProjectFactory()
    with mock_archive(project) as registration:
        assert_true(registration.is_registration)
        assert_true(registration.archiving)
        assert_true(registration.is_pending_registration)

    with mock_archive(project, autocomplete=True) as registration:
        assert_true(registration.is_registration)
        assert_false(registration.archiving)
        assert_true(registration.is_pending_registration)

    with mock_archive(project, autocomplete=True, autoapprove=True) as registration:
        assert_true(registration.is_registration)
        assert_false(registration.archiving)
        assert_false(registration.is_pending_registration)
    """
    schema = schema or get_default_metaschema()
    auth = auth or Auth(project.creator)
    data = data or ''

    with mock.patch('framework.celery_tasks.handlers.enqueue_task'):
        registration = project.register_node(
            schema=schema,
            auth=auth,
            data=data,
            parent=parent,
        )
    if embargo:
        embargo_end_date = embargo_end_date or (
            timezone.now() + dt.timedelta(days=20)
        )
        registration.embargo_registration(
            project.creator,
            embargo_end_date
        )
    else:
        registration.require_approval(project.creator)
    if autocomplete:
        root_job = registration.archive_job
        root_job.status = ARCHIVER_SUCCESS
        root_job.sent = False
        root_job.done = True
        root_job.save()
        sanction = registration.sanction
        with contextlib.nested(
            mock.patch.object(root_job, 'archive_tree_finished', mock.Mock(return_value=True)),
            mock.patch('website.archiver.tasks.archive_success.delay', mock.Mock())
        ):
            archiver_listeners.archive_callback(registration)
    if autoapprove:
        sanction = registration.sanction
        sanction.state = Sanction.APPROVED
        sanction.save()
        sanction._on_complete(project.creator)
        sanction.save()

    if retraction:
        justification = justification or 'Because reasons'
        registration.refresh_from_db()
        retraction = registration.retract_registration(project.creator, justification=justification)
        if autoapprove_retraction:
            retraction.state = Sanction.APPROVED
            retraction._on_complete(project.creator)
        retraction.save()
        registration.save()
    yield registration
开发者ID:adlius,项目名称:osf.io,代码行数:83,代码来源:utils.py


示例16: test_archive_callback_done_success

 def test_archive_callback_done_success(self, mock_send, mock_archive_success):
     for addon in ['osfstorage', 'dropbox']:
         self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
     self.dst.archive_job.save()
     listeners.archive_callback(self.dst)
     assert_equal(mock_send.call_count, 1)
开发者ID:atelic,项目名称:osf.io,代码行数:6,代码来源:test_archiver.py



注:本文中的website.archiver.listeners.archive_callback函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python model.ArchiveJob类代码示例发布时间:2022-05-26
下一篇:
Python app.init_app函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap