本文整理汇总了Python中website.archiver.listeners.archive_callback函数的典型用法代码示例。如果您正苦于以下问题:Python archive_callback函数的具体用法?Python archive_callback怎么用?Python archive_callback使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了archive_callback函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_archive_callback_done_errors
def test_archive_callback_done_errors(self):
self.dst.archive_job.update_target('dropbox', ARCHIVER_SUCCESS)
self.dst.archive_job.update_target('osfstorage', ARCHIVER_FAILURE)
self.dst.archive_job.save()
with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
listeners.archive_callback(self.dst)
assert(mock_fail.called_with(ARCHIVER_NETWORK_ERROR, self.src, self.dst, self.user, self.dst.archive_job.target_addons))
开发者ID:bdyetton,项目名称:osf.io,代码行数:7,代码来源:test_archiver.py
示例2: test_archiving_nodes_not_added_to_search_on_archive_incomplete
def test_archiving_nodes_not_added_to_search_on_archive_incomplete(self, mock_send, mock_update_search):
proj = factories.ProjectFactory()
reg = factories.RegistrationFactory(project=proj)
reg.save()
with mock.patch('website.archiver.model.ArchiveJob.archive_tree_finished', mock.Mock(return_value=False)):
listeners.archive_callback(reg)
assert_false(mock_update_search.called)
开发者ID:atelic,项目名称:osf.io,代码行数:7,代码来源:test_archiver.py
示例3: test_archive_callback_updates_archiving_state_when_done
def test_archive_callback_updates_archiving_state_when_done(self):
proj = factories.NodeFactory()
factories.NodeFactory(parent=proj)
reg = factories.RegistrationFactory(project=proj)
reg.archive_job.update_target('osfstorage', ARCHIVER_INITIATED)
child = reg.nodes[0]
child.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
child.save()
listeners.archive_callback(child)
assert_false(child.archiving)
开发者ID:bdyetton,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py
示例4: test_archiving_nodes_not_added_to_search_on_archive_failure
def test_archiving_nodes_not_added_to_search_on_archive_failure(self, mock_send, mock_delete_index_node):
proj = factories.ProjectFactory()
reg = factories.RegistrationFactory(project=proj)
reg.save()
with nested(
mock.patch('osf.models.archive.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
mock.patch('osf.models.archive.ArchiveJob.success', mock.PropertyMock(return_value=False))
) as (mock_finished, mock_success):
listeners.archive_callback(reg)
assert_true(mock_delete_index_node.called)
开发者ID:adlius,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py
示例5: test_archiving_nodes_added_to_search_on_archive_success_if_public
def test_archiving_nodes_added_to_search_on_archive_success_if_public(self, mock_update_search, mock_send, mock_archive_success):
proj = factories.ProjectFactory()
reg = factories.RegistrationFactory(project=proj)
reg.save()
with nested(
mock.patch('osf.models.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
mock.patch('osf.models.ArchiveJob.success', mock.PropertyMock(return_value=True))
) as (mock_finished, mock_success):
listeners.archive_callback(reg)
assert_equal(mock_update_search.call_count, 1)
开发者ID:adlius,项目名称:osf.io,代码行数:10,代码来源:test_archiver.py
示例6: mock_archive
def mock_archive(project, schema=None, auth=None, data=None, parent=None,
autocomplete=True, autoapprove=False):
""" A context manager for registrations. When you want to call Node#register_node in
a test but do not want to deal with any of this side effects of archiver, this
helper allows for creating a registration in a safe fashion.
:param bool autocomplete: automatically finish archival?
:param bool autoapprove: automatically approve registration approval?
Example use:
project = ProjectFactory()
with mock_archive(project) as registration:
assert_true(registration.is_registration)
assert_true(registration.archiving)
assert_true(registration.is_pending_registration)
with mock_archive(project, autocomplete=True) as registration:
assert_true(registration.is_registration)
assert_false(registration.archiving)
assert_true(registration.is_pending_registration)
with mock_archive(project, autocomplete=True, autoapprove=True) as registration:
assert_true(registration.is_registration)
assert_false(registration.archiving)
assert_false(registration.is_pending_registration)
"""
schema = schema or DEFAULT_METASCHEMA
auth = auth or Auth(project.creator)
data = data or ''
with mock.patch('framework.tasks.handlers.enqueue_task'):
registration = project.register_node(
schema=schema,
auth=auth,
data=data,
parent=parent,
)
registration.root.require_approval(project.creator)
if autocomplete:
root_job = registration.root.archive_job
root_job.status = ARCHIVER_SUCCESS
root_job.sent = False
root_job.done = True
root_job.save()
sanction = registration.root.sanction
with contextlib.nested(
mock.patch.object(root_job, 'archive_tree_finished', mock.Mock(return_value=True)),
mock.patch.object(sanction, 'ask')
):
archiver_listeners.archive_callback(registration)
if autoapprove:
sanction = registration.root.sanction
sanction._on_complete(project.creator)
yield registration
开发者ID:Alpani,项目名称:osf.io,代码行数:55,代码来源:utils.py
示例7: test_archiving_nodes_not_added_to_search_on_archive_failure
def test_archiving_nodes_not_added_to_search_on_archive_failure(self, mock_send, mock_update_search):
proj = factories.ProjectFactory()
reg = factories.RegistrationFactory(project=proj)
reg.save()
with nested(
mock.patch('website.archiver.model.ArchiveJob.archive_tree_finished', mock.Mock(return_value=True)),
mock.patch('website.archiver.model.ArchiveJob.sent', mock.PropertyMock(return_value=False)),
mock.patch('website.archiver.model.ArchiveJob.success', mock.PropertyMock(return_value=False))
) as (mock_finished, mock_sent, mock_success):
listeners.archive_callback(reg)
mock_update_search.assert_not_called()
开发者ID:Alpani,项目名称:osf.io,代码行数:11,代码来源:test_archiver.py
示例8: test_archive_callback_done_embargoed
def test_archive_callback_done_embargoed(self, mock_send, mock_archive_success):
end_date = timezone.now() + datetime.timedelta(days=30)
self.dst.archive_job.meta = {
'embargo_urls': {
contrib._id: None
for contrib in self.dst.contributors
}
}
self.dst.embargo_registration(self.user, end_date)
self.dst.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
self.dst.save()
listeners.archive_callback(self.dst)
assert_equal(mock_send.call_count, 1)
开发者ID:adlius,项目名称:osf.io,代码行数:13,代码来源:test_archiver.py
示例9: test_archive_callback_done_embargoed
def test_archive_callback_done_embargoed(self, mock_send):
end_date = datetime.datetime.now() + datetime.timedelta(days=30)
self.dst.archive_job.meta = {
'embargo_urls': {
contrib._id: None
for contrib in self.dst.contributors
}
}
self.dst.embargo_registration(self.user, end_date)
for addon in ['osfstorage', 'dropbox']:
self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
self.dst.save()
listeners.archive_callback(self.dst)
mock_send.assert_called_with(self.dst, self.user, urls=None)
开发者ID:bdyetton,项目名称:osf.io,代码行数:14,代码来源:test_archiver.py
示例10: test_archive_callback_pending
def test_archive_callback_pending(self):
for addon in ['osfstorage', 'dropbox']:
self.archive_job.update_target(
addon,
ARCHIVER_INITIATED
)
self.dst.archive_job.update_target(
'osfstorage',
ARCHIVER_SUCCESS
)
self.dst.archive_job.save()
with mock.patch('website.archiver.utils.send_archiver_success_mail') as mock_send:
with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
listeners.archive_callback(self.dst)
assert_false(mock_send.called)
assert_false(mock_fail.called)
开发者ID:bdyetton,项目名称:osf.io,代码行数:16,代码来源:test_archiver.py
示例11: test_archive_callback_pending
def test_archive_callback_pending(self, mock_delay):
self.archive_job.update_target(
'osfstorage',
ARCHIVER_INITIATED
)
self.dst.archive_job.update_target(
'osfstorage',
ARCHIVER_SUCCESS
)
self.dst.archive_job.save()
with mock.patch('website.mails.send_mail') as mock_send:
with mock.patch('website.archiver.utils.handle_archive_fail') as mock_fail:
listeners.archive_callback(self.dst)
assert_false(mock_send.called)
assert_false(mock_fail.called)
assert_true(mock_delay.called)
开发者ID:adlius,项目名称:osf.io,代码行数:16,代码来源:test_archiver.py
示例12: test_archive_callback_on_tree_sends_only_one_email
def test_archive_callback_on_tree_sends_only_one_email(self, mock_send_success):
proj = factories.NodeFactory()
child = factories.NodeFactory(parent=proj)
factories.NodeFactory(parent=child)
reg = factories.RegistrationFactory(project=proj)
rchild = reg.nodes[0]
rchild2 = rchild.nodes[0]
for node in [reg, rchild, rchild2]:
for addon in ['dropbox', 'osfstorage']:
node.archive_job._set_target(addon)
for node in [reg, rchild, rchild2]:
for addon in ['dropbox', 'osfstorage']:
node.archive_job.update_target(addon, ARCHIVER_INITIATED)
for addon in ['dropbox', 'osfstorage']:
rchild.archive_job.update_target(addon, ARCHIVER_SUCCESS)
rchild.save()
listeners.archive_callback(rchild)
mock_send_success.assert_not_called()
for addon in ['dropbox', 'osfstorage']:
reg.archive_job.update_target(addon, ARCHIVER_SUCCESS)
reg.save()
listeners.archive_callback(reg)
mock_send_success.assert_not_called()
for addon in ['dropbox', 'osfstorage']:
rchild2.archive_job.update_target(addon, ARCHIVER_SUCCESS)
rchild2.save()
listeners.archive_callback(rchild2)
mock_send_success.assert_called_with(reg)
开发者ID:bdyetton,项目名称:osf.io,代码行数:28,代码来源:test_archiver.py
示例13: test_archive_callback_on_tree_sends_only_one_email
def test_archive_callback_on_tree_sends_only_one_email(self, mock_send_success, mock_arhive_success):
proj = factories.NodeFactory()
child = factories.NodeFactory(parent=proj)
factories.NodeFactory(parent=child)
reg = factories.RegistrationFactory(project=proj)
rchild = reg._nodes.first()
rchild2 = rchild._nodes.first()
for node in [reg, rchild, rchild2]:
node.archive_job._set_target('osfstorage')
for node in [reg, rchild, rchild2]:
node.archive_job.update_target('osfstorage', ARCHIVER_INITIATED)
rchild.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
rchild.save()
listeners.archive_callback(rchild)
assert_false(mock_send_success.called)
reg.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
reg.save()
listeners.archive_callback(reg)
assert_false(mock_send_success.called)
rchild2.archive_job.update_target('osfstorage', ARCHIVER_SUCCESS)
rchild2.save()
listeners.archive_callback(rchild2)
assert_equal(mock_send_success.call_count, 1)
assert_true(mock_send_success.called)
开发者ID:adlius,项目名称:osf.io,代码行数:24,代码来源:test_archiver.py
示例14: test_archive_callback_done_success
def test_archive_callback_done_success(self, mock_send):
for addon in ['osfstorage', 'dropbox']:
self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
self.dst.archive_job.save()
listeners.archive_callback(self.dst)
mock_send.assert_called_with(self.dst)
开发者ID:bdyetton,项目名称:osf.io,代码行数:6,代码来源:test_archiver.py
示例15: mock_archive
def mock_archive(project, schema=None, auth=None, data=None, parent=None,
embargo=False, embargo_end_date=None,
retraction=False, justification=None, autoapprove_retraction=False,
autocomplete=True, autoapprove=False):
""" A context manager for registrations. When you want to call Node#register_node in
a test but do not want to deal with any of this side effects of archiver, this
helper allows for creating a registration in a safe fashion.
:param bool embargo: embargo the registration (rather than RegistrationApproval)
:param bool autocomplete: automatically finish archival?
:param bool autoapprove: automatically approve registration approval?
:param bool retraction: retract the registration?
:param str justification: a justification for the retraction
:param bool autoapprove_retraction: automatically approve retraction?
Example use:
project = ProjectFactory()
with mock_archive(project) as registration:
assert_true(registration.is_registration)
assert_true(registration.archiving)
assert_true(registration.is_pending_registration)
with mock_archive(project, autocomplete=True) as registration:
assert_true(registration.is_registration)
assert_false(registration.archiving)
assert_true(registration.is_pending_registration)
with mock_archive(project, autocomplete=True, autoapprove=True) as registration:
assert_true(registration.is_registration)
assert_false(registration.archiving)
assert_false(registration.is_pending_registration)
"""
schema = schema or get_default_metaschema()
auth = auth or Auth(project.creator)
data = data or ''
with mock.patch('framework.celery_tasks.handlers.enqueue_task'):
registration = project.register_node(
schema=schema,
auth=auth,
data=data,
parent=parent,
)
if embargo:
embargo_end_date = embargo_end_date or (
timezone.now() + dt.timedelta(days=20)
)
registration.embargo_registration(
project.creator,
embargo_end_date
)
else:
registration.require_approval(project.creator)
if autocomplete:
root_job = registration.archive_job
root_job.status = ARCHIVER_SUCCESS
root_job.sent = False
root_job.done = True
root_job.save()
sanction = registration.sanction
with contextlib.nested(
mock.patch.object(root_job, 'archive_tree_finished', mock.Mock(return_value=True)),
mock.patch('website.archiver.tasks.archive_success.delay', mock.Mock())
):
archiver_listeners.archive_callback(registration)
if autoapprove:
sanction = registration.sanction
sanction.state = Sanction.APPROVED
sanction.save()
sanction._on_complete(project.creator)
sanction.save()
if retraction:
justification = justification or 'Because reasons'
registration.refresh_from_db()
retraction = registration.retract_registration(project.creator, justification=justification)
if autoapprove_retraction:
retraction.state = Sanction.APPROVED
retraction._on_complete(project.creator)
retraction.save()
registration.save()
yield registration
开发者ID:adlius,项目名称:osf.io,代码行数:83,代码来源:utils.py
示例16: test_archive_callback_done_success
def test_archive_callback_done_success(self, mock_send, mock_archive_success):
for addon in ['osfstorage', 'dropbox']:
self.dst.archive_job.update_target(addon, ARCHIVER_SUCCESS)
self.dst.archive_job.save()
listeners.archive_callback(self.dst)
assert_equal(mock_send.call_count, 1)
开发者ID:atelic,项目名称:osf.io,代码行数:6,代码来源:test_archiver.py
注:本文中的website.archiver.listeners.archive_callback函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论