• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.generate_notification函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中qonos.common.utils.generate_notification函数的典型用法代码示例。如果您正苦于以下问题:Python generate_notification函数的具体用法?Python generate_notification怎么用?Python generate_notification使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了generate_notification函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_generate_notification

    def test_generate_notification(self):

        notification = {}

        def fake_publisher_id(service, host=None):
            if not host:
                host = 'localhost'
            return "%s.%s" % (service, host)

        def fake_notify(context, publisher_id, event_type, priority, payload):
            notification['context'] = context
            notification['publisher_id'] = publisher_id
            notification['event_type'] = event_type
            notification['priority'] = priority
            notification['payload'] = payload

        self.stubs.Set(notifier_api, 'publisher_id', fake_publisher_id)
        self.stubs.Set(notifier_api, 'notify', fake_notify)
        payload = {'id': 'fake-id'}
        self.assertEqual(notification, {})
        self.config(host='localhost')
        utils.generate_notification(None, 'qonos.fake.event', payload)
        expected_notification = {
                                    'context': None,
                                    'publisher_id': 'qonos.localhost',
                                    'event_type': 'qonos.fake.event',
                                    'priority': 'INFO',
                                    'payload': {'id': 'fake-id'}
                                }

        self.assertEqual(notification, expected_notification)
开发者ID:amalaba,项目名称:qonos,代码行数:31,代码来源:test_utils.py


示例2: test_doesnt_delete_images_on_retention_error

    def test_doesnt_delete_images_on_retention_error(self):
        timeutils.set_time_override()
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndRaise(Exception())
        mock_server = MockServer(retention=None)
        self._init_worker_mock()
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.run.end', mox.IsA(dict),
                                    mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, 'DONE', timeout=None,
                               error_message=None)
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:amalaba,项目名称:qonos,代码行数:28,代码来源:test_snapshot.py


示例3: create

    def create(self, request, body):
        if (body is None or body.get('job') is None or
                body['job'].get('schedule_id') is None):
            raise webob.exc.HTTPBadRequest()
        job = body['job']

        try:
            schedule = self.db_api.schedule_get_by_id(job['schedule_id'])
        except exception.NotFound:
            raise webob.exc.HTTPNotFound()

        # Check integrity of schedule and update next run
        expected_next_run = job.get('next_run')
        if expected_next_run:
            expected_next_run = timeutils.parse_isotime(job.get('next_run'))

        next_run = api_utils.schedule_to_next_run(schedule, timeutils.utcnow())
        try:
            self.db_api.schedule_test_and_set_next_run(schedule['id'],
                        expected_next_run, next_run)

        except exception.NotFound:
            msg = _("Specified next run does not match the current next run"
                    " value. This could mean schedule has either changed"
                    "or has already been scheduled since you last expected.")
            raise webob.exc.HTTPConflict(explanation=msg)

        # Update schedule last_scheduled
        values = {}
        values['last_scheduled'] = timeutils.utcnow()
        self.db_api.schedule_update(schedule['id'], values)

        # Create job
        values = {}
        values.update(job)
        values['tenant'] = schedule['tenant']
        values['action'] = schedule['action']
        values['status'] = 'QUEUED'

        job_metadata = []
        for metadata in schedule['schedule_metadata']:
            job_metadata.append({
                    'key': metadata['key'],
                    'value': metadata['value']
                    })

        values['job_metadata'] = job_metadata

        job_action = values['action']
        if not 'timeout' in values:
            values['timeout'] = api_utils.get_new_timeout_by_action(job_action)
            values['hard_timeout'] = \
                api_utils.get_new_timeout_by_action(job_action)

        job = self.db_api.job_create(values)
        utils.serialize_datetimes(job)
        api_utils.serialize_job_metadata(job)
        job = {'job': job}
        utils.generate_notification(None, 'qonos.job.create', job, 'INFO')
        return job
开发者ID:pperezrubio,项目名称:qonos,代码行数:60,代码来源:jobs.py


示例4: test_process_job_should_not_create_on_job_error_image_ok

    def test_process_job_should_not_create_on_job_error_image_ok(self):
        timeutils.set_time_override()
        self.job['metadata']['image_id'] = IMAGE_ID
        self.job['status'] = 'ERROR'

        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))

        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndReturn(mock_retention)

        self._simple_prepare_worker_mock(skip_metadata_update=True)

        self.worker.update_job(fakes.JOB_ID, 'DONE', timeout=None,
                               error_message=None)
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, mox.IsA(str), mox.IsA(dict),
                                    mox.IsA(str)).MultipleTimes()
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:clefelhocz,项目名称:qonos,代码行数:29,代码来源:test_snapshot.py


示例5: test_deletes_images_more_than_retention

    def test_deletes_images_more_than_retention(self):
        timeutils.set_time_override()
        instance_id = self.job["metadata"]["instance_id"]
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("ACTIVE"))
        mock_retention = MockRetention(3)
        self.nova_client.rax_scheduled_images_python_novaclient_ext.get(mox.IsA(str)).AndReturn(mock_retention)
        mock_server = MockServer(instance_id=instance_id)
        image_list = self._create_images_list(mock_server.id, 5)
        self.nova_client.images.list(detailed=True).AndReturn(image_list)
        # The image list happens to be in descending created order
        self.nova_client.images.delete(image_list[-2].id)
        self.nova_client.images.delete(image_list[-1].id)
        self._init_worker_mock()
        self.worker.update_job(fakes.JOB_ID, "DONE", timeout=None, error_message=None)
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        utils.generate_notification(None, "qonos.job.run.end", mox.IsA(dict), mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:27,代码来源:test_snapshot.py


示例6: test_process_job_should_cancel_if_schedule_deleted

    def test_process_job_should_cancel_if_schedule_deleted(self):
        self._init_qonos_client()
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        response = {'status': 'CANCELLED', 'timeout': self.job['timeout']}
        self.worker.update_job(fakes.JOB_ID, 'CANCELLED', timeout=None,
                               error_message=mox.IsA(str)).AndReturn(response)
        expected_payload = {'job': {'status': 'CANCELLED',
                   'hard_timeout': self.job['hard_timeout'],
                   'created_at': self.job['created_at'],
                   'modified_at': self.job['modified_at'],
                   'retry_count': 1,
                   'schedule_id': '33333333-3333-3333-3333-33333333',
                   'worker_id': '11111111-1111-1111-1111-11111111',
                   'timeout': self.job['timeout'],
                   'action': 'snapshot',
                   'id': '22222222-2222-2222-2222-22222222',
                   'tenant': '44444444-4444-4444-4444-44444444',
                   'metadata': {'instance_id':
                   '55555555-5555-5555-5555-55555555'}}}
        utils.generate_notification(None, 'qonos.job.failed', expected_payload,
                                    mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:clefelhocz,项目名称:qonos,代码行数:31,代码来源:test_snapshot.py


示例7: test_doesnt_delete_images_from_another_instance

    def test_doesnt_delete_images_from_another_instance(self):
        timeutils.set_time_override()
        instance_id = self.job["metadata"]["instance_id"]
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("ACTIVE"))
        mock_retention = MockRetention(3)
        self.nova_client.rax_scheduled_images_python_novaclient_ext.get(mox.IsA(str)).AndReturn(mock_retention)
        mock_server = MockServer(instance_id=instance_id)
        image_list = self._create_images_list(mock_server.id, 5)
        to_delete = image_list[3:]
        image_list.extend(self._create_images_list(uuidutils.generate_uuid(), 3))
        self.nova_client.images.list(detailed=True).AndReturn(image_list)
        # The image list happens to be in descending created order
        self.nova_client.images.delete(to_delete[0].id)
        self.nova_client.images.delete(to_delete[1].id)
        self._simple_prepare_worker_mock()
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        utils.generate_notification(None, "qonos.job.run.end", mox.IsA(dict), mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)
        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:27,代码来源:test_snapshot.py


示例8: test_process_job_should_succeed_after_multiple_tries

    def test_process_job_should_succeed_after_multiple_tries(self):
        timeutils.set_time_override()
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('QUEUED'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndReturn(mock_retention)
        self._simple_prepare_worker_mock()
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.run.end', mox.IsA(dict),
                                    mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:pperezrubio,项目名称:qonos,代码行数:30,代码来源:test_snapshot.py


示例9: _do_test_process_job_should_update_image_error

    def _do_test_process_job_should_update_image_error(self, error_status):
        base_time = timeutils.utcnow()
        time_seq = [
            base_time,
            base_time,
            base_time + datetime.timedelta(seconds=305),
            base_time + datetime.timedelta(seconds=605),
            base_time + datetime.timedelta(seconds=905),
            base_time + datetime.timedelta(seconds=1205),
            base_time + datetime.timedelta(seconds=1505),
            ]
        timeutils.set_time_override_seq(time_seq)

        job = copy.deepcopy(self.job)
        job['timeout'] = base_time + datetime.timedelta(minutes=60)

        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('QUEUED'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            error_status)

        self._init_worker_mock()
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING', timeout=None,
                               error_message=None)
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING', timeout=None,
                               error_message=None)
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING', timeout=None,
                               error_message=None)
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING', timeout=None,
                               error_message=None)
        self.worker.update_job(fakes.JOB_ID, 'ERROR', timeout=None,
                               error_message=mox.IsA(str))

        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(job)

        self.mox.VerifyAll()
开发者ID:pperezrubio,项目名称:qonos,代码行数:53,代码来源:test_snapshot.py


示例10: test_process_job_should_cancel_if_schedule_deleted

    def test_process_job_should_cancel_if_schedule_deleted(self):
        self._init_qonos_client()
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, "CANCELLED", timeout=None, error_message=mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:13,代码来源:test_snapshot.py


示例11: test_process_job_should_update_status_and_timestamp

    def test_process_job_should_update_status_and_timestamp(self):
        base_time = timeutils.utcnow()
        time_seq = [
            base_time,
            base_time,
            base_time + datetime.timedelta(seconds=305),
            base_time + datetime.timedelta(minutes=60, seconds=5),
            base_time + datetime.timedelta(minutes=60, seconds=305),
            ]
        timeutils.set_time_override_seq(time_seq)

        job = copy.deepcopy(self.job)
        job['timeout'] = base_time + datetime.timedelta(minutes=60)

        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('QUEUED'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('SAVING'))
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndReturn(mock_retention)
        self._init_worker_mock()
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING',
                               timeout=mox.IsA(datetime.datetime),
                               error_message=None)
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING', timeout=None,
                               error_message=None)
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.run.end', mox.IsA(dict),
                                    mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, 'DONE', timeout=None,
                               error_message=None)
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(job)

        self.mox.VerifyAll()
开发者ID:amalaba,项目名称:qonos,代码行数:51,代码来源:test_snapshot.py


示例12: test_process_job_should_cancel_if_no_instance_id

    def test_process_job_should_cancel_if_no_instance_id(self):
        self._init_qonos_client(schedule=MockSchedule())
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, "PROCESSING", timeout=mox.IsA(datetime.datetime), error_message=None)
        self.worker.update_job(fakes.JOB_ID, "CANCELLED", timeout=None, error_message=mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)
        del self.job["metadata"]["instance_id"]
        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:14,代码来源:test_snapshot.py


示例13: test_process_job_should_cancel_if_instance_not_found

    def test_process_job_should_cancel_if_instance_not_found(self):
        self._init_qonos_client(schedule=MockSchedule())
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndRaise(
            exceptions.NotFound("404")
        )
        self.worker.update_job(fakes.JOB_ID, "PROCESSING", timeout=mox.IsA(datetime.datetime), error_message=None)
        self.worker.update_job(fakes.JOB_ID, "CANCELLED", timeout=None, error_message=mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)
        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:19,代码来源:test_snapshot.py


示例14: test_process_job_should_succeed_immediately

    def test_process_job_should_succeed_immediately(self):
        timeutils.set_time_override()
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("ACTIVE"))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.get(mox.IsA(str)).AndReturn(mock_retention)
        self._simple_prepare_worker_mock()

        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        utils.generate_notification(None, "qonos.job.run.end", mox.IsA(dict), mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:22,代码来源:test_snapshot.py


示例15: test_process_job_new_image_when_retrying_with_failed_image

    def test_process_job_new_image_when_retrying_with_failed_image(self):
        timeutils.set_time_override()
        self.job['metadata']['image_id'] = IMAGE_ID
        self.job['status'] = 'ERROR'

        #make an error checking the previous image
        self.nova_client.images.get(IMAGE_ID).AndRaise(
            Exception())

        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndReturn(mock_retention)
        self._simple_prepare_worker_mock()

        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.retry', mox.IsA(dict),
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.run.end', mox.IsA(dict),
                                    mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, 'DONE',
                error_message=None, timeout=None).AndReturn(None)

        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:amalaba,项目名称:qonos,代码行数:37,代码来源:test_snapshot.py


示例16: test_process_job_should_update_status_timestamp_no_retries

    def test_process_job_should_update_status_timestamp_no_retries(self):
        base_time = timeutils.utcnow()
        time_seq = [
            base_time,
            base_time,
            base_time + datetime.timedelta(minutes=5, seconds=5),
            base_time + datetime.timedelta(minutes=60, seconds=5),
            base_time + datetime.timedelta(minutes=120, seconds=5),
            base_time + datetime.timedelta(minutes=180, seconds=5),
            base_time + datetime.timedelta(minutes=240, seconds=5),
        ]
        timeutils.set_time_override_seq(time_seq)

        job = copy.deepcopy(self.job)
        job["timeout"] = base_time + datetime.timedelta(minutes=60)

        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("QUEUED"))
        self.nova_client.images.get(IMAGE_ID).MultipleTimes().AndReturn(MockImageStatus("SAVING"))

        self._init_worker_mock()
        self.worker.update_job(fakes.JOB_ID, "PROCESSING", timeout=mox.IsA(datetime.datetime), error_message=None)
        self.worker.update_job(fakes.JOB_ID, "PROCESSING", timeout=mox.IsA(datetime.datetime), error_message=None)
        self.worker.update_job(fakes.JOB_ID, "PROCESSING", timeout=mox.IsA(datetime.datetime), error_message=None)
        self.worker.update_job(fakes.JOB_ID, "TIMED_OUT", timeout=None, error_message=None)
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:36,代码来源:test_snapshot.py


示例17: test_process_job_should_update_status_only

    def test_process_job_should_update_status_only(self):
        base_time = timeutils.utcnow()
        time_seq = [
            base_time,
            base_time,
            base_time + datetime.timedelta(seconds=305),
            base_time + datetime.timedelta(seconds=605),
            base_time + datetime.timedelta(seconds=905),
        ]
        timeutils.set_time_override_seq(time_seq)

        job = copy.deepcopy(self.job)
        job["timeout"] = base_time + datetime.timedelta(minutes=60)

        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str), mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("QUEUED"))
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("SAVING"))
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("SAVING"))
        self.nova_client.images.get(IMAGE_ID).AndReturn(MockImageStatus("ACTIVE"))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.get(mox.IsA(str)).AndReturn(mock_retention)
        self._simple_prepare_worker_mock(2)
        self.mox.StubOutWithMock(utils, "generate_notification")
        utils.generate_notification(None, "qonos.job.run.start", mox.IsA(dict), mox.IsA(str))
        utils.generate_notification(None, "qonos.job.run.end", mox.IsA(dict), mox.IsA(str))
        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)

        processor.process_job(job)

        self.mox.VerifyAll()
开发者ID:hmakkapati,项目名称:qonos,代码行数:36,代码来源:test_snapshot.py


示例18: test_process_job_should_cancel_if_no_instance_id

    def test_process_job_should_cancel_if_no_instance_id(self):
        self._init_qonos_client(schedule=dict())
        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        self.worker.update_job(fakes.JOB_ID, 'PROCESSING',
                               timeout=mox.IsA(datetime.datetime),
                               error_message=None).\
            AndReturn({'status': 'PROCESSING', 'timeout': 'new_timeout'})
        self.worker.update_job(fakes.JOB_ID, 'CANCELLED', timeout=None,
                               error_message=mox.IsA(str)).\
            AndReturn({'status': 'CANCELLED', 'timeout': self.job['timeout']})
        expected_processing_payload = \
            {'job': {'status': 'PROCESSING',
                     'hard_timeout': self.job['hard_timeout'],
                     'created_at': self.job['created_at'],
                     'modified_at': self.job['modified_at'],
                     'retry_count': 1,
                     'schedule_id': '33333333-3333-3333-3333-33333333',
                     'worker_id': '11111111-1111-1111-1111-11111111',
                     'timeout': 'new_timeout',
                     'action': 'snapshot',
                     'id': '22222222-2222-2222-2222-22222222',
                     'tenant': '44444444-4444-4444-4444-44444444',
                     'metadata': {}}}

        expected_cancelled_payload = \
            {'job': {'status': 'CANCELLED',
                     'hard_timeout': self.job['hard_timeout'],
                     'created_at': self.job['created_at'],
                     'modified_at': self.job['modified_at'],
                     'retry_count': 1,
                     'schedule_id': '33333333-3333-3333-3333-33333333',
                     'worker_id': '11111111-1111-1111-1111-11111111',
                     'timeout': self.job['timeout'],
                     'action': 'snapshot',
                     'id': '22222222-2222-2222-2222-22222222',
                     'tenant': '44444444-4444-4444-4444-44444444',
                     'metadata': {}}}
        utils.generate_notification(None, 'qonos.job.update',
                                    expected_processing_payload,
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.failed',
                                    expected_cancelled_payload,
                                    mox.IsA(str))

        self.mox.ReplayAll()

        processor = TestableSnapshotProcessor(self.nova_client)
        processor.init_processor(self.worker)
        del self.job['metadata']['instance_id']
        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:clefelhocz,项目名称:qonos,代码行数:54,代码来源:test_snapshot.py


示例19: test_process_job_should_succeed_immediately

    def test_process_job_should_succeed_immediately(self):
        timeutils.set_time_override()
        self.nova_client.servers.get(mox.IsA(str)).AndReturn(MockServer())
        self.nova_client.servers.create_image(mox.IsA(str),
            mox.IsA(str), self.snapshot_meta).AndReturn(IMAGE_ID)
        self.nova_client.images.get(IMAGE_ID).AndReturn(
            MockImageStatus('ACTIVE'))
        mock_retention = MockRetention()
        self.nova_client.rax_scheduled_images_python_novaclient_ext.\
            get(mox.IsA(str)).AndReturn(mock_retention)
        self._init_worker_mock(False)

        expected_processing_payload = \
            {'job': {'status': 'PROCESSING',
                     'hard_timeout': self.job['hard_timeout'],
                     'created_at': self.job['created_at'],
                     'modified_at': self.job['modified_at'],
                     'retry_count': 1,
                     'schedule_id': '33333333-3333-3333-3333-33333333',
                     'worker_id': '11111111-1111-1111-1111-11111111',
                     'timeout': self.job['timeout'],
                     'action': 'snapshot',
                     'id': '22222222-2222-2222-2222-22222222',
                     'tenant': '44444444-4444-4444-4444-44444444',
                     'metadata': {'instance_id':
                                  '55555555-5555-5555-5555-55555555'}}}

        self.mox.StubOutWithMock(utils, 'generate_notification')
        utils.generate_notification(None, 'qonos.job.run.start', mox.IsA(dict),
                                    mox.IsA(str))
        utils.generate_notification(None, 'qonos.job.update',
                                    expected_processing_payload,
                                    mox.IsA(str))

        utils.generate_notification(None, 'qonos.job.run.end', mox.IsA(dict),
                                    mox.IsA(str))

        self.worker.get_qonos_client().AndReturn(self.qonos_client)
        self.qonos_client.delete_schedule(mox.IsA(str))
        processor = TestableSnapshotProcessor(self.nova_client)
        self.worker.update_job(fakes.JOB_ID, 'DONE', timeout=None,
                               error_message=None).\
            AndReturn({'status': 'CANCELLED', 'timeout': self.job['timeout']})
        self.mox.ReplayAll()

        processor.init_processor(self.worker)
        processor.process_job(self.job)

        self.mox.VerifyAll()
开发者ID:frielp,项目名称:qonos,代码行数:49,代码来源:test_snapshot.py


示例20: send_notification

 def send_notification(self, event_type, payload, level='INFO'):
     utils.generate_notification(None, event_type, payload, level)
开发者ID:hmakkapati,项目名称:qonos,代码行数:2,代码来源:worker.py



注:本文中的qonos.common.utils.generate_notification函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.serialize_datetimes函数代码示例发布时间:2022-05-26
下一篇:
Python timeutils.utcnow函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap