本文整理汇总了Python中testing_helpers.build_real_server函数的典型用法代码示例。如果您正苦于以下问题:Python build_real_server函数的具体用法?Python build_real_server怎么用?Python build_real_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了build_real_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_check_postgres
def test_check_postgres(self, postgres_mock, capsys):
"""
Test management of check_postgres view output
:param postgres_mock: mock get_remote_status function
:param capsys: retrieve output from consolle
"""
postgres_mock.return_value = {"server_txt_version": None}
# Create server
server = build_real_server()
# Case: no reply by PostgreSQL
# Expect out: PostgreSQL: FAILED
strategy = CheckOutputStrategy()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == " PostgreSQL: FAILED\n"
# Case: correct configuration
postgres_mock.return_value = {
"current_xlog": None,
"archive_command": "wal to archive",
"pgespresso_installed": None,
"server_txt_version": "PostgresSQL 9_4",
"data_directory": "/usr/local/postgres",
"archive_mode": "on",
"wal_level": "replica",
}
# Expect out: all parameters: OK
# Postgres version >= 9.0 - check wal_level
server = build_real_server()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n" "\twal_level: OK\n"
# Postgres version < 9.0 - avoid wal_level check
del postgres_mock.return_value["wal_level"]
server = build_real_server()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n"
# Case: wal_level and archive_command values are not acceptable
postgres_mock.return_value = {
"current_xlog": None,
"archive_command": None,
"pgespresso_installed": None,
"server_txt_version": "PostgresSQL 9_4",
"data_directory": "/usr/local/postgres",
"archive_mode": "on",
"wal_level": "minimal",
}
# Expect out: some parameters: FAILED
strategy = CheckOutputStrategy()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n" "\twal_level: FAILED (please set it to a higher level " "than 'minimal')\n"
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:58,代码来源:test_server.py
示例2: test_check_postgres
def test_check_postgres(self, postgres_mock, capsys):
"""
Test management of check_postgres view output
:param postgres_mock: mock get_remote_status function
:param capsys: retrieve output from consolle
"""
postgres_mock.return_value = {'server_txt_version': None}
# Create server
server = build_real_server()
# Case: no reply by PostgreSQL
# Expect out: PostgreSQL: FAILED
strategy = CheckOutputStrategy()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == ' PostgreSQL: FAILED\n'
# Case: correct configuration
postgres_mock.return_value = {'current_xlog': None,
'archive_command': 'wal to archive',
'pgespresso_installed': None,
'server_txt_version': 'PostgresSQL 9_4',
'data_directory': '/usr/local/postgres',
'archive_mode': 'on',
'wal_level': 'replica'}
# Expect out: all parameters: OK
# Postgres version >= 9.0 - check wal_level
server = build_real_server()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n" \
"\twal_level: OK\n"
# Postgres version < 9.0 - avoid wal_level check
del postgres_mock.return_value['wal_level']
server = build_real_server()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n"
# Case: wal_level and archive_command values are not acceptable
postgres_mock.return_value = {'current_xlog': None,
'archive_command': None,
'pgespresso_installed': None,
'server_txt_version': 'PostgresSQL 9_4',
'data_directory': '/usr/local/postgres',
'archive_mode': 'on',
'wal_level': 'minimal'}
# Expect out: some parameters: FAILED
strategy = CheckOutputStrategy()
server.check_postgres(strategy)
(out, err) = capsys.readouterr()
assert out == "\tPostgreSQL: OK\n" \
"\twal_level: FAILED (please set it to a higher level " \
"than 'minimal')\n"
开发者ID:moench-tegeder,项目名称:barman,代码行数:57,代码来源:test_server.py
示例3: test_is_in_recovery
def test_is_in_recovery(self, conn_mock):
"""
simple test for is_in_recovery property
"""
# Build a server
server = build_real_server()
cursor_mock = conn_mock.return_value.cursor.return_value
# Too old
conn_mock.return_value.server_version = 80400
assert not server.postgres.is_in_recovery
# In recovery
conn_mock.return_value.server_version = 90100
cursor_mock.fetchone.return_value = [True]
assert server.postgres.is_in_recovery
cursor_mock.execute.assert_called_once_with("SELECT pg_is_in_recovery()")
# Not in recovery
cursor_mock.fetchone.return_value = [False]
assert not server.postgres.is_in_recovery
# Reset mock
conn_mock.reset_mock()
# Test error management
cursor_mock.execute.side_effect = PostgresConnectionError
assert server.postgres.is_in_recovery is None
cursor_mock.execute.side_effect = psycopg2.ProgrammingError
assert server.postgres.is_in_recovery is None
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py
示例4: test_delete_running_backup
def test_delete_running_backup(self, delete_mock, get_first_backup_mock, tmpdir, capsys):
"""
Simple test for the deletion of a running backup.
We want to test the behaviour of the server.delete_backup method
when invoked on a running backup
"""
# Test the removal of a running backup. status STARTED
server = build_real_server({"barman_home": tmpdir.strpath})
backup_info_started = build_test_backup_info(status=BackupInfo.STARTED, server_name=server.config.name)
get_first_backup_mock.return_value = backup_info_started.backup_id
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_started)
out, err = capsys.readouterr()
assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err
# Test the removal of a running backup. status EMPTY
backup_info_empty = build_test_backup_info(status=BackupInfo.EMPTY, server_name=server.config.name)
get_first_backup_mock.return_value = backup_info_empty.backup_id
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_empty)
out, err = capsys.readouterr()
assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err
# Test the removal of a running backup. status DONE
backup_info_done = build_test_backup_info(status=BackupInfo.DONE, server_name=server.config.name)
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_done)
delete_mock.assert_called_with(backup_info_done)
# Test the removal of a backup not running. status STARTED
server.delete_backup(backup_info_started)
delete_mock.assert_called_with(backup_info_started)
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:32,代码来源:test_server.py
示例5: test_has_pgespresso
def test_has_pgespresso(self, conn_mock):
"""
simple test for has_pgespresso property
"""
# Build a server
server = build_real_server()
cursor_mock = conn_mock.return_value.cursor.return_value
# Too old
conn_mock.return_value.server_version = 90000
assert not server.postgres.has_pgespresso
# Extension present
conn_mock.return_value.server_version = 90100
cursor_mock.fetchone.return_value = [1]
assert server.postgres.has_pgespresso
cursor_mock.execute.assert_called_once_with("SELECT count(*) FROM pg_extension " "WHERE extname = 'pgespresso'")
# Extension not present
cursor_mock.fetchone.return_value = [0]
assert not server.postgres.has_pgespresso
# Reset mock
conn_mock.reset_mock()
# Test error management
cursor_mock.execute.side_effect = PostgresConnectionError
assert server.postgres.has_pgespresso is None
cursor_mock.execute.side_effect = psycopg2.ProgrammingError
assert server.postgres.has_pgespresso is None
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py
示例6: test_check_archive
def test_check_archive(self, tmpdir):
"""
Test the check_archive method
"""
# Setup temp dir and server
server = build_real_server(
global_conf={"barman_lock_directory": tmpdir.mkdir("lock").strpath},
main_conf={"wals_directory": tmpdir.mkdir("wals").strpath},
)
strategy = CheckStrategy()
# Call the server on an unexistent xlog file. expect it to fail
server.check_archive(strategy)
assert strategy.has_error is True
assert strategy.check_result[0].check == "WAL archive"
assert strategy.check_result[0].status is False
# Call the check on an empty xlog file. expect it to contain errors.
with open(server.xlogdb_file_name, "a"):
# the open call forces the file creation
pass
server.check_archive(strategy)
assert strategy.has_error is True
assert strategy.check_result[0].check == "WAL archive"
assert strategy.check_result[0].status is False
# Write something in the xlog db file and check for the results
with server.xlogdb("w") as fxlogdb:
fxlogdb.write("00000000000000000000")
# The check strategy should contain no errors.
strategy = CheckStrategy()
server.check_archive(strategy)
assert strategy.has_error is False
assert len(strategy.check_result) == 0
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:35,代码来源:test_server.py
示例7: test_switch_xlog
def test_switch_xlog(self, capsys):
server = build_real_server()
server.postgres = MagicMock()
server.postgres.switch_xlog.return_value = '000000010000000000000001'
server.switch_xlog(force=False)
out, err = capsys.readouterr()
assert "Switch to 000000010000000000000001 for server 'main'" \
in out
assert server.postgres.checkpoint.called is False
server.postgres.reset_mock()
server.postgres.switch_xlog.return_value = '000000010000000000000001'
server.switch_xlog(force=True)
out, err = capsys.readouterr()
assert "Switch to 000000010000000000000001 for server 'main'" \
in out
assert server.postgres.checkpoint.called is True
server.postgres.reset_mock()
server.postgres.switch_xlog.return_value = ''
server.switch_xlog(force=False)
out, err = capsys.readouterr()
assert "No switch required for server 'main'" in out
assert server.postgres.checkpoint.called is False
开发者ID:moench-tegeder,项目名称:barman,代码行数:27,代码来源:test_server.py
示例8: test_prepare_tablespaces
def test_prepare_tablespaces(self, tmpdir):
"""
Test tablespaces preparation for recovery
"""
# Prepare basic directory/files structure
dest = tmpdir.mkdir('destination')
wals = tmpdir.mkdir('wals')
backup_info = testing_helpers.build_test_backup_info(
tablespaces=[('tbs1', 16387, '/fake/location')])
# build an executor
server = testing_helpers.build_real_server(
main_conf={'wals_directory': wals.strpath})
executor = RecoveryExecutor(server.backup_manager)
# use a mock as cmd obj
cmd_mock = Mock()
executor.prepare_tablespaces(backup_info, cmd_mock, dest.strpath, {})
cmd_mock.create_dir_if_not_exists.assert_any_call(
dest.join('pg_tblspc').strpath)
cmd_mock.create_dir_if_not_exists.assert_any_call(
'/fake/location')
cmd_mock.delete_if_exists.assert_called_once_with(
dest.join('pg_tblspc').join('16387').strpath)
cmd_mock.create_symbolic_link.assert_called_once_with(
'/fake/location',
dest.join('pg_tblspc').join('16387').strpath)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:25,代码来源:test_recovery_executor.py
示例9: test_xlogdb_with_exception
def test_xlogdb_with_exception(self, os_mock, tmpdir):
"""
Testing the execution of xlog-db operations with an Exception
:param os_mock: mock for os module
:param tmpdir: temporary directory unique to the test invocation
"""
# unpatch os.path
os_mock.path = os.path
# Setup temp dir and server
server = build_real_server(
global_conf={
"barman_lock_directory": tmpdir.mkdir('lock').strpath
},
main_conf={
"wals_directory": tmpdir.mkdir('wals').strpath
})
# Test the execution of the fsync on xlogdb file forcing an exception
with pytest.raises(ExceptionTest):
with server.xlogdb('w') as fxlogdb:
fxlogdb.write("00000000000000000000")
raise ExceptionTest()
# Check call on fsync method. If the call have been issued,
# the "exit" section of the contextmanager have been executed
assert os_mock.fsync.called
开发者ID:moench-tegeder,项目名称:barman,代码行数:25,代码来源:test_server.py
示例10: test_recover_xlog
def test_recover_xlog(self, rsync_pg_mock, tmpdir):
"""
Test the recovery of the xlogs of a backup
:param rsync_pg_mock: Mock rsync object for the purpose if this test
"""
# Build basic folders/files structure
dest = tmpdir.mkdir('destination')
wals = tmpdir.mkdir('wals')
xlog_dir = wals.mkdir(xlog.hash_dir('000000000000000000000002'))
xlog_file = xlog_dir.join('000000000000000000000002')
xlog_file.write('dummy content')
server = testing_helpers.build_real_server(
main_conf={'wals_directory': wals.strpath})
# build executor
executor = RecoveryExecutor(server.backup_manager)
required_wals = (WalFileInfo.from_xlogdb_line(
'000000000000000000000002\t42\t43\tNone\n'),)
executor.xlog_copy(required_wals, dest.strpath, None)
# check for a correct invocation of rsync using local paths
rsync_pg_mock.from_file_list.assert_called_once(
['000000000000000000000002'],
xlog_dir.strpath,
dest.strpath)
# reset mock calls
rsync_pg_mock.reset_mock()
required_wals = (WalFileInfo.from_xlogdb_line(
'000000000000000000000002\t42\t43\tNone\n'),)
executor.backup_manager.compression_manager = Mock()
executor.xlog_copy(required_wals, dest.strpath, 'remote_command')
# check for the invocation of rsync on a remote call
rsync_pg_mock.assert_called_once(network_compression=False,
bwlimit=None,
ssh='remote_command')
开发者ID:stig,项目名称:barman,代码行数:33,代码来源:test_recovery_executor.py
示例11: test_check_archive
def test_check_archive(self, tmpdir):
"""
Test the check_archive method
"""
# Setup temp dir and server
server = build_real_server(
global_conf={
"barman_lock_directory": tmpdir.mkdir('lock').strpath
},
main_conf={
"wals_directory": tmpdir.mkdir('wals').strpath
})
strategy = CheckStrategy()
# Call the check on an empty xlog file. expect it to contain errors.
server.check_archive(strategy)
assert strategy.has_error is True
assert strategy.check_result[0].check == 'WAL archive'
assert strategy.check_result[0].status is False
# Write something in the xlog db file and check for the results
with server.xlogdb('w') as fxlogdb:
fxlogdb.write("00000000000000000000")
# The check strategy should contain no errors.
strategy = CheckStrategy()
server.check_archive(strategy)
assert strategy.has_error is False
assert len(strategy.check_result) == 0
开发者ID:jamonation,项目名称:barman,代码行数:27,代码来源:test_server.py
示例12: test_current_xlog_info
def test_current_xlog_info(self, is_in_recovery_mock, conn_mock):
"""
Test correct select xlog_loc
"""
# Build and configure a server using a mock
server = build_real_server()
cursor_mock = conn_mock.return_value.cursor.return_value
timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
current_xlog_info = dict(
location='0/35000528',
file_name='000000010000000000000035',
file_offset=1320,
timestamp=timestamp,
)
cursor_mock.fetchone.return_value = current_xlog_info
is_in_recovery_mock.return_value = False
# sequence
remote_loc = server.postgres.current_xlog_info
assert remote_loc == current_xlog_info
cursor_mock.execute.assert_called_once_with(
'SELECT location, (pg_xlogfile_name_offset(location)).*, '
'CURRENT_TIMESTAMP AS timestamp '
'FROM pg_current_xlog_location() AS location')
# Reset mock
conn_mock.reset_mock()
# Test error management
cursor_mock.execute.side_effect = PostgresConnectionError
assert server.postgres.current_xlog_info is None
cursor_mock.execute.side_effect = psycopg2.ProgrammingError
assert server.postgres.current_xlog_info is None
开发者ID:jamonation,项目名称:barman,代码行数:35,代码来源:test_postgres.py
示例13: test_current_xlog_file_name
def test_current_xlog_file_name(self, is_in_recovery_mock, conn_mock):
"""
simple test for current_xlog property
"""
# Build a server
server = build_real_server()
cursor_mock = conn_mock.return_value.cursor.return_value
timestamp = datetime.datetime(2016, 3, 30, 17, 4, 20, 271376)
cursor_mock.fetchone.return_value = dict(
location='0/35000528',
file_name='000000010000000000000035',
file_offset=1320,
timestamp=timestamp,
)
# Special way to mock a property
is_in_recovery_mock.return_value = False
assert server.postgres.current_xlog_file_name == (
'000000010000000000000035')
# Reset mock
conn_mock.reset_mock()
# Test error management
cursor_mock.execute.side_effect = PostgresConnectionError
assert server.postgres.current_xlog_file_name is None
cursor_mock.execute.side_effect = psycopg2.ProgrammingError
assert server.postgres.current_xlog_file_name is None
开发者ID:jamonation,项目名称:barman,代码行数:30,代码来源:test_postgres.py
示例14: test_stop_exclusive_backup
def test_stop_exclusive_backup(self, conn):
"""
Basic test for the stop_exclusive_backup method
:param conn: a mock that imitates a connection to PostgreSQL
"""
# Build a server
server = build_real_server()
# Expect no errors on normal call
assert server.postgres.stop_exclusive_backup()
# check the correct invocation of the execute method
cursor_mock = conn.return_value.cursor.return_value
cursor_mock.execute.assert_called_once_with(
'SELECT location, '
'(pg_xlogfile_name_offset(location)).*, '
'now() AS timestamp '
'FROM pg_stop_backup() AS location'
)
# Test 2: Setup the mock to trigger an exception
# expect the method to return None
conn.reset_mock()
cursor_mock.execute.side_effect = psycopg2.Error
# Check that the method returns None as result
assert server.postgres.stop_exclusive_backup() is None
开发者ID:jamonation,项目名称:barman,代码行数:26,代码来源:test_postgres.py
示例15: test_streaming_server_txt_version
def test_streaming_server_txt_version(self, conn):
"""
simple test for the server_txt_version property
"""
# Build a server
server = build_real_server(
main_conf={
'streaming_archiver': True,
'streaming_conninfo': 'dummy=param'})
conn.return_value.server_version = 80300
assert server.streaming.server_txt_version == '8.3.0'
conn.return_value.server_version = 90000
assert server.streaming.server_txt_version == '9.0.0'
conn.return_value.server_version = 90005
assert server.streaming.server_txt_version == '9.0.5'
conn.return_value.server_version = 100201
assert server.streaming.server_txt_version == '10.2.1'
conn.return_value.server_version = 101811
assert server.streaming.server_txt_version == '10.18.11'
conn.return_value.server_version = 0
assert server.streaming.server_txt_version == '0.0.0'
开发者ID:zacchiro,项目名称:barman,代码行数:27,代码来源:test_postgres.py
示例16: test_get_streaming_remote_status
def test_get_streaming_remote_status(self, conn):
"""
simple test for the get_configuration_files method
"""
# Build a server
server = build_real_server(
main_conf={
'streaming_archiver': True,
'streaming_conninfo': 'dummy=param'})
# Working streaming connection
conn.return_value.server_version = 90300
result = server.streaming.get_remote_status()
assert result['streaming'] is True
# Working non-streaming connection
cursor_mock = conn.return_value.cursor.return_value
cursor_mock.execute.assert_called_once_with("IDENTIFY_SYSTEM")
conn.reset_mock()
cursor_mock.execute.side_effect = psycopg2.ProgrammingError
result = server.streaming.get_remote_status()
assert result['streaming'] is False
# Connection failed
cursor_mock.execute.assert_called_once_with("IDENTIFY_SYSTEM")
conn.reset_mock()
conn.side_effect = PostgresConnectionError
result = server.streaming.get_remote_status()
assert result['streaming'] is None
开发者ID:zacchiro,项目名称:barman,代码行数:29,代码来源:test_postgres.py
示例17: test_streaming_server_txt_version
def test_streaming_server_txt_version(self, conn_mock):
"""
simple test for the server_txt_version property
"""
# Build a server
server = build_real_server(main_conf={"streaming_archiver": True, "streaming_conninfo": "dummy=param"})
# Connection error
conn_mock.side_effect = PostgresConnectionError
assert server.streaming.server_txt_version is None
# Good connection
conn_mock.side_effect = None
conn_mock.return_value.server_version = 80300
assert server.streaming.server_txt_version == "8.3.0"
conn_mock.return_value.server_version = 90000
assert server.streaming.server_txt_version == "9.0.0"
conn_mock.return_value.server_version = 90005
assert server.streaming.server_txt_version == "9.0.5"
conn_mock.return_value.server_version = 100201
assert server.streaming.server_txt_version == "10.2.1"
conn_mock.return_value.server_version = 101811
assert server.streaming.server_txt_version == "10.18.11"
conn_mock.return_value.server_version = 0
assert server.streaming.server_txt_version == "0.0.0"
开发者ID:aydantasdemir,项目名称:barman,代码行数:31,代码来源:test_postgres.py
示例18: test_recover_basebackup_copy
def test_recover_basebackup_copy(self, rsync_pg_mock, tmpdir):
"""
Test the copy of a content of a backup during a recovery
:param rsync_pg_mock: Mock rsync object for the purpose if this test
"""
# Build basic folder/files structure
dest = tmpdir.mkdir('destination')
server = testing_helpers.build_real_server()
backup_info = testing_helpers.build_test_backup_info(
server=server,
tablespaces=[('tbs1', 16387, '/fake/location')])
# Build a executor
executor = RecoveryExecutor(server.backup_manager)
executor.config.tablespace_bandwidth_limit = {'tbs1': ''}
executor.config.bandwidth_limit = 10
executor.basebackup_copy(
backup_info, dest.strpath, tablespaces=None)
rsync_pg_mock.assert_called_with(
network_compression=False, bwlimit=10, ssh=None, path=None,
exclude_and_protect=['/pg_tblspc/16387'])
rsync_pg_mock.assert_any_call(
network_compression=False, bwlimit='', ssh=None, path=None,
check=True)
rsync_pg_mock.return_value.smart_copy.assert_any_call(
'/some/barman/home/main/base/1234567890/16387/',
'/fake/location', None)
rsync_pg_mock.return_value.smart_copy.assert_called_with(
'/some/barman/home/main/base/1234567890/data/',
dest.strpath, None)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:29,代码来源:test_recovery_executor.py
示例19: test_get_wal_info
def test_get_wal_info(self, get_wal_mock, tmpdir):
"""
Basic test for get_wal_info method
Test the wals per second and total time in seconds values.
:return:
"""
# Build a test server with a test path
server = build_real_server(global_conf={
'barman_home': tmpdir.strpath
})
# Mock method get_wal_until_next_backup for returning a list of
# 3 fake WAL. the first one is the start and stop WAL of the backup
wal_list = [
WalFileInfo.from_xlogdb_line(
"000000010000000000000002\t16777216\t1434450086.53\tNone\n"),
WalFileInfo.from_xlogdb_line(
"000000010000000000000003\t16777216\t1434450087.54\tNone\n"),
WalFileInfo.from_xlogdb_line(
"000000010000000000000004\t16777216\t1434450088.55\tNone\n")]
get_wal_mock.return_value = wal_list
backup_info = build_test_backup_info(
server=server,
begin_wal=wal_list[0].name,
end_wal=wal_list[0].name)
backup_info.save()
# Evaluate total time in seconds:
# last_wal_timestamp - first_wal_timestamp
wal_total_seconds = wal_list[-1].time - wal_list[0].time
# Evaluate the wals_per_second value:
# wals_in_backup + wals_until_next_backup / total_time_in_seconds
wals_per_second = len(wal_list) / wal_total_seconds
wal_info = server.get_wal_info(backup_info)
assert wal_info
assert wal_info['wal_total_seconds'] == wal_total_seconds
assert wal_info['wals_per_second'] == wals_per_second
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_server.py
示例20: test_drop_repslot
def test_drop_repslot(self, capsys):
"""
Test the 'drop_repslot' method of the Postgres
class
"""
# No operation if there is no streaming connection
server = build_real_server()
server.streaming = None
assert server.drop_repslot() is None
# No operation if the slot name is empty
server.streaming = MagicMock()
server.config.slot_name = None
server.streaming.server_version = 90400
assert server.drop_repslot() is None
# If there is a streaming connection and the replication
# slot is defined, then the replication slot should be
# created
server.config.slot_name = 'test_repslot'
server.streaming.server_version = 90400
server.drop_repslot()
drop_repslot = server.streaming.drop_repslot
drop_repslot.assert_called_with('test_repslot')
# If the replication slot doesn't exist
# check that the underlying exception is correctly managed
drop_repslot.side_effect = PostgresInvalidReplicationSlot
server.drop_repslot()
drop_repslot.assert_called_with('test_repslot')
out, err = capsys.readouterr()
assert "Replication slot 'test_repslot' does not exist" in err
开发者ID:moench-tegeder,项目名称:barman,代码行数:33,代码来源:test_server.py
注:本文中的testing_helpers.build_real_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论