• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python testing_helpers.build_real_server函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testing_helpers.build_real_server函数的典型用法代码示例。如果您正苦于以下问题:Python build_real_server函数的具体用法?Python build_real_server怎么用?Python build_real_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了build_real_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_check_postgres

    def test_check_postgres(self, postgres_mock, capsys):
        """
        Test management of check_postgres view output

        :param postgres_mock: mock get_remote_status function
        :param capsys: retrieve output from consolle
        """
        postgres_mock.return_value = {"server_txt_version": None}
        # Create server
        server = build_real_server()
        # Case: no reply by PostgreSQL
        # Expect out: PostgreSQL: FAILED
        strategy = CheckOutputStrategy()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "	PostgreSQL: FAILED\n"
        # Case: correct configuration
        postgres_mock.return_value = {
            "current_xlog": None,
            "archive_command": "wal to archive",
            "pgespresso_installed": None,
            "server_txt_version": "PostgresSQL 9_4",
            "data_directory": "/usr/local/postgres",
            "archive_mode": "on",
            "wal_level": "replica",
        }

        # Expect out: all parameters: OK

        # Postgres version >= 9.0 - check wal_level
        server = build_real_server()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n" "\twal_level: OK\n"

        # Postgres version < 9.0 - avoid wal_level check
        del postgres_mock.return_value["wal_level"]

        server = build_real_server()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n"

        # Case: wal_level and archive_command values are not acceptable
        postgres_mock.return_value = {
            "current_xlog": None,
            "archive_command": None,
            "pgespresso_installed": None,
            "server_txt_version": "PostgresSQL 9_4",
            "data_directory": "/usr/local/postgres",
            "archive_mode": "on",
            "wal_level": "minimal",
        }
        # Expect out: some parameters: FAILED
        strategy = CheckOutputStrategy()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n" "\twal_level: FAILED (please set it to a higher level " "than 'minimal')\n"
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:58,代码来源:test_server.py


示例2: test_check_postgres

    def test_check_postgres(self, postgres_mock, capsys):
        """
        Test management of check_postgres view output

        :param postgres_mock: mock get_remote_status function
        :param capsys: retrieve output from consolle
        """
        postgres_mock.return_value = {'server_txt_version': None}
        # Create server
        server = build_real_server()
        # Case: no reply by PostgreSQL
        # Expect out: PostgreSQL: FAILED
        strategy = CheckOutputStrategy()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == '	PostgreSQL: FAILED\n'
        # Case: correct configuration
        postgres_mock.return_value = {'current_xlog': None,
                                      'archive_command': 'wal to archive',
                                      'pgespresso_installed': None,
                                      'server_txt_version': 'PostgresSQL 9_4',
                                      'data_directory': '/usr/local/postgres',
                                      'archive_mode': 'on',
                                      'wal_level': 'replica'}

        # Expect out: all parameters: OK

        # Postgres version >= 9.0 - check wal_level
        server = build_real_server()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n" \
                      "\twal_level: OK\n"

        # Postgres version < 9.0 - avoid wal_level check
        del postgres_mock.return_value['wal_level']

        server = build_real_server()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n"

        # Case: wal_level and archive_command values are not acceptable
        postgres_mock.return_value = {'current_xlog': None,
                                      'archive_command': None,
                                      'pgespresso_installed': None,
                                      'server_txt_version': 'PostgresSQL 9_4',
                                      'data_directory': '/usr/local/postgres',
                                      'archive_mode': 'on',
                                      'wal_level': 'minimal'}
        # Expect out: some parameters: FAILED
        strategy = CheckOutputStrategy()
        server.check_postgres(strategy)
        (out, err) = capsys.readouterr()
        assert out == "\tPostgreSQL: OK\n" \
                      "\twal_level: FAILED (please set it to a higher level " \
                      "than 'minimal')\n"
开发者ID:moench-tegeder,项目名称:barman,代码行数:57,代码来源:test_server.py


示例3: test_is_in_recovery

    def test_is_in_recovery(self, conn_mock):
        """
        simple test for is_in_recovery property
        """
        # Build a server
        server = build_real_server()
        cursor_mock = conn_mock.return_value.cursor.return_value

        # Too old
        conn_mock.return_value.server_version = 80400
        assert not server.postgres.is_in_recovery

        # In recovery
        conn_mock.return_value.server_version = 90100
        cursor_mock.fetchone.return_value = [True]
        assert server.postgres.is_in_recovery
        cursor_mock.execute.assert_called_once_with("SELECT pg_is_in_recovery()")

        # Not in recovery
        cursor_mock.fetchone.return_value = [False]
        assert not server.postgres.is_in_recovery

        # Reset mock
        conn_mock.reset_mock()

        # Test error management
        cursor_mock.execute.side_effect = PostgresConnectionError
        assert server.postgres.is_in_recovery is None

        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
        assert server.postgres.is_in_recovery is None
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py


示例4: test_delete_running_backup

    def test_delete_running_backup(self, delete_mock, get_first_backup_mock, tmpdir, capsys):
        """
        Simple test for the deletion of a running backup.
        We want to test the behaviour of the server.delete_backup method
        when invoked on a running backup
        """
        # Test the removal of a running backup. status STARTED
        server = build_real_server({"barman_home": tmpdir.strpath})
        backup_info_started = build_test_backup_info(status=BackupInfo.STARTED, server_name=server.config.name)
        get_first_backup_mock.return_value = backup_info_started.backup_id
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_started)
            out, err = capsys.readouterr()
            assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err

        # Test the removal of a running backup. status EMPTY
        backup_info_empty = build_test_backup_info(status=BackupInfo.EMPTY, server_name=server.config.name)
        get_first_backup_mock.return_value = backup_info_empty.backup_id
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_empty)
            out, err = capsys.readouterr()
            assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err

        # Test the removal of a running backup. status DONE
        backup_info_done = build_test_backup_info(status=BackupInfo.DONE, server_name=server.config.name)
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_done)
            delete_mock.assert_called_with(backup_info_done)

        # Test the removal of a backup not running. status STARTED
        server.delete_backup(backup_info_started)
        delete_mock.assert_called_with(backup_info_started)
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:32,代码来源:test_server.py


示例5: test_has_pgespresso

    def test_has_pgespresso(self, conn_mock):
        """
        simple test for has_pgespresso property
        """
        # Build a server
        server = build_real_server()
        cursor_mock = conn_mock.return_value.cursor.return_value

        # Too old
        conn_mock.return_value.server_version = 90000
        assert not server.postgres.has_pgespresso

        # Extension present
        conn_mock.return_value.server_version = 90100
        cursor_mock.fetchone.return_value = [1]
        assert server.postgres.has_pgespresso
        cursor_mock.execute.assert_called_once_with("SELECT count(*) FROM pg_extension " "WHERE extname = 'pgespresso'")

        # Extension not present
        cursor_mock.fetchone.return_value = [0]
        assert not server.postgres.has_pgespresso

        # Reset mock
        conn_mock.reset_mock()

        # Test error management
        cursor_mock.execute.side_effect = PostgresConnectionError
        assert server.postgres.has_pgespresso is None

        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
        assert server.postgres.has_pgespresso is None
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py


示例6: test_check_archive

    def test_check_archive(self, tmpdir):
        """
        Test the check_archive method
        """
        # Setup temp dir and server
        server = build_real_server(
            global_conf={"barman_lock_directory": tmpdir.mkdir("lock").strpath},
            main_conf={"wals_directory": tmpdir.mkdir("wals").strpath},
        )
        strategy = CheckStrategy()

        # Call the server on an unexistent xlog file. expect it to fail
        server.check_archive(strategy)
        assert strategy.has_error is True
        assert strategy.check_result[0].check == "WAL archive"
        assert strategy.check_result[0].status is False

        # Call the check on an empty xlog file. expect it to contain errors.
        with open(server.xlogdb_file_name, "a"):
            # the open call forces the file creation
            pass

        server.check_archive(strategy)
        assert strategy.has_error is True
        assert strategy.check_result[0].check == "WAL archive"
        assert strategy.check_result[0].status is False

        # Write something in the xlog db file and check for the results
        with server.xlogdb("w") as fxlogdb:
            fxlogdb.write("00000000000000000000")
        # The check strategy should contain no errors.
        strategy = CheckStrategy()
        server.check_archive(strategy)
        assert strategy.has_error is False
        assert len(strategy.check_result) == 0
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:35,代码来源:test_server.py


示例7: test_switch_xlog

    def test_switch_xlog(self, capsys):
        server = build_real_server()

        server.postgres = MagicMock()
        server.postgres.switch_xlog.return_value = '000000010000000000000001'
        server.switch_xlog(force=False)
        out, err = capsys.readouterr()
        assert "Switch to 000000010000000000000001 for server 'main'" \
               in out
        assert server.postgres.checkpoint.called is False

        server.postgres.reset_mock()
        server.postgres.switch_xlog.return_value = '000000010000000000000001'
        server.switch_xlog(force=True)

        out, err = capsys.readouterr()
        assert "Switch to 000000010000000000000001 for server 'main'" \
               in out
        assert server.postgres.checkpoint.called is True
        server.postgres.reset_mock()
        server.postgres.switch_xlog.return_value = ''

        server.switch_xlog(force=False)

        out, err = capsys.readouterr()
        assert "No switch required for server 'main'" in out
        assert server.postgres.checkpoint.called is False
开发者ID:moench-tegeder,项目名称:barman,代码行数:27,代码来源:test_server.py


示例8: test_prepare_tablespaces

 def test_prepare_tablespaces(self, tmpdir):
     """
     Test tablespaces preparation for recovery
     """
     # Prepare basic directory/files structure
     dest = tmpdir.mkdir('destination')
     wals = tmpdir.mkdir('wals')
     backup_info = testing_helpers.build_test_backup_info(
         tablespaces=[('tbs1', 16387, '/fake/location')])
     # build an executor
     server = testing_helpers.build_real_server(
         main_conf={'wals_directory': wals.strpath})
     executor = RecoveryExecutor(server.backup_manager)
     # use a mock as cmd obj
     cmd_mock = Mock()
     executor.prepare_tablespaces(backup_info, cmd_mock, dest.strpath, {})
     cmd_mock.create_dir_if_not_exists.assert_any_call(
         dest.join('pg_tblspc').strpath)
     cmd_mock.create_dir_if_not_exists.assert_any_call(
         '/fake/location')
     cmd_mock.delete_if_exists.assert_called_once_with(
         dest.join('pg_tblspc').join('16387').strpath)
     cmd_mock.create_symbolic_link.assert_called_once_with(
         '/fake/location',
         dest.join('pg_tblspc').join('16387').strpath)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:25,代码来源:test_recovery_executor.py


示例9: test_xlogdb_with_exception

    def test_xlogdb_with_exception(self, os_mock, tmpdir):
        """
        Testing the execution of xlog-db operations with an Exception

        :param os_mock: mock for os module
        :param tmpdir: temporary directory unique to the test invocation
        """
        # unpatch os.path
        os_mock.path = os.path
        # Setup temp dir and server
        server = build_real_server(
            global_conf={
                "barman_lock_directory": tmpdir.mkdir('lock').strpath
            },
            main_conf={
                "wals_directory": tmpdir.mkdir('wals').strpath
            })
        # Test the execution of the fsync on xlogdb file forcing an exception
        with pytest.raises(ExceptionTest):
            with server.xlogdb('w') as fxlogdb:
                fxlogdb.write("00000000000000000000")
                raise ExceptionTest()
        # Check call on fsync method. If the call have been issued,
        # the "exit" section of the contextmanager have been executed
        assert os_mock.fsync.called
开发者ID:moench-tegeder,项目名称:barman,代码行数:25,代码来源:test_server.py


示例10: test_recover_xlog

 def test_recover_xlog(self, rsync_pg_mock, tmpdir):
     """
     Test the recovery of the xlogs of a backup
     :param rsync_pg_mock: Mock rsync object for the purpose if this test
     """
     # Build basic folders/files structure
     dest = tmpdir.mkdir('destination')
     wals = tmpdir.mkdir('wals')
     xlog_dir = wals.mkdir(xlog.hash_dir('000000000000000000000002'))
     xlog_file = xlog_dir.join('000000000000000000000002')
     xlog_file.write('dummy content')
     server = testing_helpers.build_real_server(
         main_conf={'wals_directory': wals.strpath})
     # build executor
     executor = RecoveryExecutor(server.backup_manager)
     required_wals = (WalFileInfo.from_xlogdb_line(
         '000000000000000000000002\t42\t43\tNone\n'),)
     executor.xlog_copy(required_wals, dest.strpath, None)
     # check for a correct invocation of rsync using local paths
     rsync_pg_mock.from_file_list.assert_called_once(
         ['000000000000000000000002'],
         xlog_dir.strpath,
         dest.strpath)
     # reset mock calls
     rsync_pg_mock.reset_mock()
     required_wals = (WalFileInfo.from_xlogdb_line(
         '000000000000000000000002\t42\t43\tNone\n'),)
     executor.backup_manager.compression_manager = Mock()
     executor.xlog_copy(required_wals, dest.strpath, 'remote_command')
     # check for the invocation of rsync on a remote call
     rsync_pg_mock.assert_called_once(network_compression=False,
                                      bwlimit=None,
                                      ssh='remote_command')
开发者ID:stig,项目名称:barman,代码行数:33,代码来源:test_recovery_executor.py


示例11: test_check_archive

    def test_check_archive(self, tmpdir):
        """
        Test the check_archive method
        """
        # Setup temp dir and server
        server = build_real_server(
            global_conf={
                "barman_lock_directory": tmpdir.mkdir('lock').strpath
            },
            main_conf={
                "wals_directory": tmpdir.mkdir('wals').strpath
            })
        strategy = CheckStrategy()
        # Call the check on an empty xlog file. expect it to contain errors.
        server.check_archive(strategy)
        assert strategy.has_error is True
        assert strategy.check_result[0].check == 'WAL archive'
        assert strategy.check_result[0].status is False

        # Write something in the xlog db file and check for the results
        with server.xlogdb('w') as fxlogdb:
            fxlogdb.write("00000000000000000000")
        # The check strategy should contain no errors.
        strategy = CheckStrategy()
        server.check_archive(strategy)
        assert strategy.has_error is False
        assert len(strategy.check_result) == 0
开发者ID:jamonation,项目名称:barman,代码行数:27,代码来源:test_server.py


示例12: test_current_xlog_info

    def test_current_xlog_info(self, is_in_recovery_mock, conn_mock):
        """
        Test correct select xlog_loc
        """
        # Build and configure a server using a mock
        server = build_real_server()
        cursor_mock = conn_mock.return_value.cursor.return_value
        timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
        current_xlog_info = dict(
            location='0/35000528',
            file_name='000000010000000000000035',
            file_offset=1320,
            timestamp=timestamp,
        )
        cursor_mock.fetchone.return_value = current_xlog_info
        is_in_recovery_mock.return_value = False

        # sequence
        remote_loc = server.postgres.current_xlog_info
        assert remote_loc == current_xlog_info

        cursor_mock.execute.assert_called_once_with(
            'SELECT location, (pg_xlogfile_name_offset(location)).*, '
            'CURRENT_TIMESTAMP AS timestamp '
            'FROM pg_current_xlog_location() AS location')

        # Reset mock
        conn_mock.reset_mock()

        # Test error management
        cursor_mock.execute.side_effect = PostgresConnectionError
        assert server.postgres.current_xlog_info is None

        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
        assert server.postgres.current_xlog_info is None
开发者ID:jamonation,项目名称:barman,代码行数:35,代码来源:test_postgres.py


示例13: test_current_xlog_file_name

    def test_current_xlog_file_name(self, is_in_recovery_mock, conn_mock):
        """
        simple test for current_xlog property
        """
        # Build a server
        server = build_real_server()
        cursor_mock = conn_mock.return_value.cursor.return_value

        timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
        cursor_mock.fetchone.return_value = dict(
            location='0/35000528',
            file_name='000000010000000000000035',
            file_offset=1320,
            timestamp=timestamp,
        )

        # Special way to mock a property
        is_in_recovery_mock.return_value = False
        assert server.postgres.current_xlog_file_name == (
            '000000010000000000000035')

        # Reset mock
        conn_mock.reset_mock()

        # Test error management
        cursor_mock.execute.side_effect = PostgresConnectionError
        assert server.postgres.current_xlog_file_name is None

        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
        assert server.postgres.current_xlog_file_name is None
开发者ID:jamonation,项目名称:barman,代码行数:30,代码来源:test_postgres.py


示例14: test_stop_exclusive_backup

    def test_stop_exclusive_backup(self, conn):
        """
        Basic test for the stop_exclusive_backup method

        :param conn: a mock that imitates a connection to PostgreSQL
        """
        # Build a server
        server = build_real_server()

        # Expect no errors on normal call
        assert server.postgres.stop_exclusive_backup()

        # check the correct invocation of the execute method
        cursor_mock = conn.return_value.cursor.return_value
        cursor_mock.execute.assert_called_once_with(
            'SELECT location, '
            '(pg_xlogfile_name_offset(location)).*, '
            'now() AS timestamp '
            'FROM pg_stop_backup() AS location'
        )
        # Test 2: Setup the mock to trigger an exception
        # expect the method to return None
        conn.reset_mock()
        cursor_mock.execute.side_effect = psycopg2.Error
        # Check that the method returns None as result
        assert server.postgres.stop_exclusive_backup() is None
开发者ID:jamonation,项目名称:barman,代码行数:26,代码来源:test_postgres.py


示例15: test_streaming_server_txt_version

    def test_streaming_server_txt_version(self, conn):
        """
        simple test for the server_txt_version property
        """
        # Build a server
        server = build_real_server(
            main_conf={
                'streaming_archiver': True,
                'streaming_conninfo': 'dummy=param'})

        conn.return_value.server_version = 80300
        assert server.streaming.server_txt_version == '8.3.0'

        conn.return_value.server_version = 90000
        assert server.streaming.server_txt_version == '9.0.0'

        conn.return_value.server_version = 90005
        assert server.streaming.server_txt_version == '9.0.5'

        conn.return_value.server_version = 100201
        assert server.streaming.server_txt_version == '10.2.1'

        conn.return_value.server_version = 101811
        assert server.streaming.server_txt_version == '10.18.11'

        conn.return_value.server_version = 0
        assert server.streaming.server_txt_version == '0.0.0'
开发者ID:zacchiro,项目名称:barman,代码行数:27,代码来源:test_postgres.py


示例16: test_get_streaming_remote_status

    def test_get_streaming_remote_status(self, conn):
        """
        simple test for the get_configuration_files method
        """
        # Build a server
        server = build_real_server(
            main_conf={
                'streaming_archiver': True,
                'streaming_conninfo': 'dummy=param'})

        # Working streaming connection
        conn.return_value.server_version = 90300
        result = server.streaming.get_remote_status()
        assert result['streaming'] is True

        # Working non-streaming connection
        cursor_mock = conn.return_value.cursor.return_value
        cursor_mock.execute.assert_called_once_with("IDENTIFY_SYSTEM")
        conn.reset_mock()
        cursor_mock.execute.side_effect = psycopg2.ProgrammingError
        result = server.streaming.get_remote_status()
        assert result['streaming'] is False

        # Connection failed
        cursor_mock.execute.assert_called_once_with("IDENTIFY_SYSTEM")
        conn.reset_mock()
        conn.side_effect = PostgresConnectionError
        result = server.streaming.get_remote_status()
        assert result['streaming'] is None
开发者ID:zacchiro,项目名称:barman,代码行数:29,代码来源:test_postgres.py


示例17: test_streaming_server_txt_version

    def test_streaming_server_txt_version(self, conn_mock):
        """
        simple test for the server_txt_version property
        """
        # Build a server
        server = build_real_server(main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"})

        # Connection error
        conn_mock.side_effect = PostgresConnectionError
        assert server.streaming.server_txt_version is None

        # Good connection
        conn_mock.side_effect = None

        conn_mock.return_value.server_version = 80300
        assert server.streaming.server_txt_version == "8.3.0"

        conn_mock.return_value.server_version = 90000
        assert server.streaming.server_txt_version == "9.0.0"

        conn_mock.return_value.server_version = 90005
        assert server.streaming.server_txt_version == "9.0.5"

        conn_mock.return_value.server_version = 100201
        assert server.streaming.server_txt_version == "10.2.1"

        conn_mock.return_value.server_version = 101811
        assert server.streaming.server_txt_version == "10.18.11"

        conn_mock.return_value.server_version = 0
        assert server.streaming.server_txt_version == "0.0.0"
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py


示例18: test_recover_basebackup_copy

 def test_recover_basebackup_copy(self, rsync_pg_mock, tmpdir):
     """
     Test the copy of a content of a backup during a recovery
     :param rsync_pg_mock: Mock rsync object for the purpose if this test
     """
     # Build basic folder/files structure
     dest = tmpdir.mkdir('destination')
     server = testing_helpers.build_real_server()
     backup_info = testing_helpers.build_test_backup_info(
         server=server,
         tablespaces=[('tbs1', 16387, '/fake/location')])
     # Build a executor
     executor = RecoveryExecutor(server.backup_manager)
     executor.config.tablespace_bandwidth_limit = {'tbs1': ''}
     executor.config.bandwidth_limit = 10
     executor.basebackup_copy(
         backup_info, dest.strpath, tablespaces=None)
     rsync_pg_mock.assert_called_with(
         network_compression=False, bwlimit=10, ssh=None, path=None,
         exclude_and_protect=['/pg_tblspc/16387'])
     rsync_pg_mock.assert_any_call(
         network_compression=False, bwlimit='', ssh=None, path=None,
         check=True)
     rsync_pg_mock.return_value.smart_copy.assert_any_call(
         '/some/barman/home/main/base/1234567890/16387/',
         '/fake/location', None)
     rsync_pg_mock.return_value.smart_copy.assert_called_with(
         '/some/barman/home/main/base/1234567890/data/',
         dest.strpath, None)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:29,代码来源:test_recovery_executor.py


示例19: test_get_wal_info

 def test_get_wal_info(self, get_wal_mock, tmpdir):
     """
     Basic test for get_wal_info method
     Test the wals per second and total time in seconds values.
     :return:
     """
     # Build a test server with a test path
     server = build_real_server(global_conf={
         'barman_home': tmpdir.strpath
     })
     # Mock method get_wal_until_next_backup for returning a list of
     # 3 fake WAL. the first one is the start and stop WAL of the backup
     wal_list = [
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000002\t16777216\t1434450086.53\tNone\n"),
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000003\t16777216\t1434450087.54\tNone\n"),
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000004\t16777216\t1434450088.55\tNone\n")]
     get_wal_mock.return_value = wal_list
     backup_info = build_test_backup_info(
         server=server,
         begin_wal=wal_list[0].name,
         end_wal=wal_list[0].name)
     backup_info.save()
     # Evaluate total time in seconds:
     # last_wal_timestamp - first_wal_timestamp
     wal_total_seconds = wal_list[-1].time - wal_list[0].time
     # Evaluate the wals_per_second value:
     # wals_in_backup + wals_until_next_backup / total_time_in_seconds
     wals_per_second = len(wal_list) / wal_total_seconds
     wal_info = server.get_wal_info(backup_info)
     assert wal_info
     assert wal_info['wal_total_seconds'] == wal_total_seconds
     assert wal_info['wals_per_second'] == wals_per_second
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_server.py


示例20: test_drop_repslot

    def test_drop_repslot(self, capsys):
        """
        Test the 'drop_repslot' method of the Postgres
        class
        """

        # No operation if there is no streaming connection
        server = build_real_server()
        server.streaming = None
        assert server.drop_repslot() is None

        # No operation if the slot name is empty
        server.streaming = MagicMock()
        server.config.slot_name = None
        server.streaming.server_version = 90400
        assert server.drop_repslot() is None

        # If there is a streaming connection and the replication
        # slot is defined, then the replication slot should be
        # created
        server.config.slot_name = 'test_repslot'
        server.streaming.server_version = 90400
        server.drop_repslot()
        drop_repslot = server.streaming.drop_repslot
        drop_repslot.assert_called_with('test_repslot')

        # If the replication slot doesn't exist
        # check that the underlying exception is correctly managed
        drop_repslot.side_effect = PostgresInvalidReplicationSlot
        server.drop_repslot()
        drop_repslot.assert_called_with('test_repslot')
        out, err = capsys.readouterr()
        assert "Replication slot 'test_repslot' does not exist" in err
开发者ID:moench-tegeder,项目名称:barman,代码行数:33,代码来源:test_server.py



注:本文中的testing_helpers.build_real_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap