本文整理汇总了Python中salttesting.mock.patch.object函数的典型用法代码示例。如果您正苦于以下问题:Python object函数的具体用法?Python object怎么用?Python object使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了object函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_genrepo
def test_genrepo(self):
'''
Test to refresh the winrepo.p file of the repository
'''
ret = {'name': 'salt',
'changes': {},
'result': False,
'comment': ''}
mock = MagicMock(side_effect=[False, True, True, True, True, True,
True])
with patch.object(os.path, 'exists', mock):
ret.update({'comment': 'missing /srv/salt/win/repo'})
self.assertDictEqual(winrepo.genrepo('salt'), ret)
mock = MagicMock(return_value={'win_repo': 'salt',
'win_repo_mastercachefile': 'abc'})
with patch.object(salt.config, 'master_config', mock):
mock = MagicMock(return_value=[0, 1, 2, 3, 4, 5, 6, 7, 8])
with patch.object(os, 'stat', mock):
mock = MagicMock(return_value=[])
with patch.object(os, 'walk', mock):
with patch.dict(winrepo.__opts__, {"test": True}):
ret.update({'comment': '', 'result': None})
self.assertDictEqual(winrepo.genrepo('salt'), ret)
with patch.dict(winrepo.__opts__, {"test": False}):
ret.update({'result': True})
self.assertDictEqual(winrepo.genrepo('salt'), ret)
ret.update({'changes': {'winrepo': []}})
self.assertDictEqual(winrepo.genrepo('salt', True),
ret)
开发者ID:DaveQB,项目名称:salt,代码行数:32,代码来源:winrepo_test.py
示例2: test_usage
def test_usage(self):
'''
Test if it shows in which disk the chunks are allocated.
'''
mock = MagicMock(return_value={'retcode': 1,
'stderr': '',
'stdout': 'Salt'})
with patch.dict(btrfs.__salt__, {'cmd.run_all': mock}):
mock = MagicMock(return_value={'Salt': 'salt'})
with patch.object(btrfs, '_usage_specific', mock):
self.assertDictEqual(btrfs.usage('/dev/sda1'),
{'Salt': 'salt'})
mock = MagicMock(return_value={'retcode': 1,
'stderr': '',
'stdout': 'Unallocated:\n'})
with patch.dict(btrfs.__salt__, {'cmd.run_all': mock}):
mock = MagicMock(return_value={'/dev/sda1': True})
with patch.object(btrfs, '_usage_unallocated', mock):
self.assertDictEqual(btrfs.usage('/dev/sda1'),
{'unallocated': {'/dev/sda1': True}})
mock = MagicMock(return_value={'retcode': 1,
'stderr': '',
'stdout': 'Overall:\n'})
with patch.dict(btrfs.__salt__, {'cmd.run_all': mock}):
mock = MagicMock(return_value={'/dev/sda1': True})
with patch.object(btrfs, '_usage_overall', mock):
self.assertDictEqual(btrfs.usage('/dev/sda1'),
{'overall': {'/dev/sda1': True}})
开发者ID:DaveQB,项目名称:salt,代码行数:30,代码来源:btrfs_test.py
示例3: test_get_bond
def test_get_bond(self):
'''
Test to return the content of a bond script
'''
with patch.object(os.path, 'join', return_value='A'):
with patch.object(rh_ip, '_read_file', return_value='A'):
self.assertEqual(rh_ip.get_bond('iface'), 'A')
开发者ID:shineforever,项目名称:ops,代码行数:7,代码来源:rh_ip_test.py
示例4: test_pkg
def test_pkg(self):
'''
Test to execute a packaged state run
'''
mock = MagicMock(side_effect=[False, True, True, True, True, True])
with patch.object(os.path, 'isfile', mock):
self.assertEqual(state.pkg("/tmp/state_pkg.tgz", "", "md5"), {})
mock = MagicMock(side_effect=[False, 0, 0, 0, 0])
with patch.object(salt.utils, 'get_hash', mock):
self.assertDictEqual(state.pkg("/tmp/state_pkg.tgz", "", "md5"),
{})
self.assertDictEqual(state.pkg("/tmp/state_pkg.tgz", 0, "md5"),
{})
MockTarFile.path = ""
MockJson.flag = True
with patch('salt.utils.fopen', mock_open()):
self.assertListEqual(state.pkg("/tmp/state_pkg.tgz",
0,
"md5"),
[True])
MockTarFile.path = ""
MockJson.flag = False
with patch('salt.utils.fopen', mock_open()):
self.assertTrue(state.pkg("/tmp/state_pkg.tgz",
0, "md5"))
开发者ID:DaveQB,项目名称:salt,代码行数:29,代码来源:state_test.py
示例5: test_restart
def test_restart(self):
'''
Test for Restart the named service
'''
with patch.object(launchctl, 'stop', return_value=None):
with patch.object(launchctl, 'start', return_value=True):
self.assertTrue(launchctl.restart('job_label'))
开发者ID:bryson,项目名称:salt,代码行数:7,代码来源:launchctl_test.py
示例6: test_missing
def test_missing(self):
'''
Test to the inverse of service.available.
'''
mock = MagicMock(side_effect=lambda x: _SYSTEMCTL_STATUS[x])
# systemd < 231
with patch.dict(systemd.__context__, {'salt.utils.systemd.version': 230}):
with patch.object(systemd, '_systemctl_status', mock):
self.assertFalse(systemd.missing('sshd.service'))
self.assertTrue(systemd.missing('foo.service'))
# systemd >= 231
with patch.dict(systemd.__context__, {'salt.utils.systemd.version': 231}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, '_systemctl_status', mock):
self.assertFalse(systemd.missing('sshd.service'))
self.assertTrue(systemd.missing('bar.service'))
# systemd < 231 with retcode/output changes backported (e.g. RHEL 7.3)
with patch.dict(systemd.__context__, {'salt.utils.systemd.version': 219}):
with patch.dict(_SYSTEMCTL_STATUS, _SYSTEMCTL_STATUS_GTE_231):
with patch.object(systemd, '_systemctl_status', mock):
self.assertFalse(systemd.missing('sshd.service'))
self.assertTrue(systemd.missing('bar.service'))
开发者ID:bryson,项目名称:salt,代码行数:25,代码来源:systemd_test.py
示例7: test_get_saved_policy
def test_get_saved_policy(self):
'''
Test if it return the current policy for the specified table/chain
'''
self.assertEqual(iptables.get_saved_policy(table='filter', chain=None,
conf_file=None,
family='ipv4'),
'Error: Chain needs to be specified')
with patch.object(iptables, '_parse_conf',
MagicMock(return_value={'filter':
{'INPUT':
{'policy': True}}})):
self.assertTrue(iptables.get_saved_policy(table='filter',
chain='INPUT',
conf_file=None,
family='ipv4'))
with patch.object(iptables, '_parse_conf',
MagicMock(return_value={'filter':
{'INPUT':
{'policy1': True}}})):
self.assertIsNone(iptables.get_saved_policy(table='filter',
chain='INPUT',
conf_file=None,
family='ipv4'))
开发者ID:bryson,项目名称:salt,代码行数:26,代码来源:iptables_test.py
示例8: test_set_hwclock
def test_set_hwclock(self):
'''
Test to sets the hardware clock to be either UTC or localtime
'''
zone = 'America/Denver'
with patch.object(timezone, 'get_zone', return_value=zone):
with patch.dict(timezone.__grains__, {'os_family': 'Solaris',
'cpuarch': 'sparc'}):
self.assertRaises(
SaltInvocationError,
timezone.set_hwclock,
'clock'
)
self.assertRaises(
SaltInvocationError,
timezone.set_hwclock,
'localtime'
)
with patch.dict(timezone.__grains__,
{'os_family': 'DoesNotMatter'}):
with patch.object(os.path, 'exists', return_value=False):
self.assertRaises(
CommandExecutionError,
timezone.set_hwclock,
'UTC'
)
开发者ID:HowardMei,项目名称:saltstack,代码行数:28,代码来源:timezone_test.py
示例9: test_verify_log
def test_verify_log(self):
"""
Test that verify_log works as expected
"""
message = "Insecure logging configuration detected! Sensitive data may be logged."
mock_cheese = MagicMock()
with patch.object(log, "warning", mock_cheese):
verify_log({"log_level": "cheeseshop"})
mock_cheese.assert_called_once_with(message)
mock_trace = MagicMock()
with patch.object(log, "warning", mock_trace):
verify_log({"log_level": "trace"})
mock_trace.assert_called_once_with(message)
mock_none = MagicMock()
with patch.object(log, "warning", mock_none):
verify_log({})
mock_none.assert_called_once_with(message)
mock_info = MagicMock()
with patch.object(log, "warning", mock_info):
verify_log({"log_level": "info"})
mock_info.assert_not_called()
开发者ID:bryson,项目名称:salt,代码行数:25,代码来源:verify_test.py
示例10: test_provider_case_insensitive
def test_provider_case_insensitive(self):
'''
Ensure that both lowercase and non-lowercase values are supported
'''
provider = 'GitPython'
for role_name, role_class in (
('gitfs', salt.utils.gitfs.GitFS),
('git_pillar', salt.utils.gitfs.GitPillar),
('winrepo', salt.utils.gitfs.WinRepo)):
key = '{0}_provider'.format(role_name)
with patch.object(role_class, 'verify_gitpython',
MagicMock(return_value=True)):
with patch.object(role_class, 'verify_pygit2',
MagicMock(return_value=False)):
with patch.object(role_class, 'verify_dulwich',
MagicMock(return_value=False)):
args = [OPTS]
if role_name == 'winrepo':
args.append('/tmp/winrepo-dir')
with patch.dict(OPTS, {key: provider}):
# Try to create an instance with uppercase letters in
# provider name. If it fails then a
# FileserverConfigError will be raised, so no assert is
# necessary.
role_class(*args)
# Now try to instantiate an instance with all lowercase
# letters. Again, no need for an assert here.
role_class(*args)
开发者ID:bryson,项目名称:salt,代码行数:29,代码来源:gitfs_test.py
示例11: test_runner
def test_runner(self):
'''
Test for execute a runner on the master and return
the data from the runner function
'''
with patch.dict(raet_publish.__opts__, {'id': 'id'}):
with patch.object(salt.transport.Channel, 'factory', MagicMock()):
self.assertTrue(raet_publish.runner('fun'))
class MockFactory(object):
'''
Mock factory class
'''
load = ''
def send(self, load):
'''
mock send method
'''
self.load = load
raise SaltReqTimeoutError(load)
with patch.dict(raet_publish.__opts__, {'id': 'id'}):
with patch.object(salt.transport.Channel, 'factory',
MagicMock(return_value=MockFactory())):
self.assertEqual(raet_publish.runner(1),
'\'1\' runner publish timed out')
开发者ID:HowardMei,项目名称:saltstack,代码行数:27,代码来源:raet_publish_test.py
示例12: test_fire_master
def test_fire_master(self,
salt_crypt_sauth,
salt_transport_channel_factory):
'''
Test for Fire an event off up to the master server
'''
preload = {'id': 'id', 'tag': 'tag', 'data': 'data',
'tok': 'salt', 'cmd': '_minion_event'}
with patch.dict(event.__opts__, {'transport': 'raet',
'id': 'id',
'local': False}):
with patch.object(salt_transport_channel_factory, 'send',
return_value=None):
self.assertTrue(event.fire_master('data', 'tag'))
with patch.dict(event.__opts__, {'transport': 'A',
'id': 'id',
'master_uri': 'localhost',
'local': False}):
with patch.object(salt_crypt_sauth, 'gen_token',
return_value='tok'):
with patch.object(salt_transport_channel_factory, 'send',
return_value=None):
self.assertTrue(event.fire_master('data', 'tag', preload))
with patch.dict(event.__opts__, {'transport': 'A', 'local': False}):
with patch.object(salt.utils.event.MinionEvent, 'fire_event',
side_effect=Exception('foo')):
self.assertFalse(event.fire_master('data', 'tag'))
开发者ID:HowardMei,项目名称:saltstack,代码行数:31,代码来源:event_test.py
示例13: test_enabled
def test_enabled(self):
"""
Test for Return True if the named service is enabled, false otherwise
"""
mock = MagicMock(return_value={'name': ['default']})
with patch.object(gentoo_service, 'get_enabled', mock):
# service is enabled at any level
self.assertTrue(gentoo_service.enabled('name'))
# service is enabled at the requested runlevels
self.assertTrue(gentoo_service.enabled('name', runlevels='default'))
# service is enabled at a different runlevels
self.assertFalse(gentoo_service.enabled('name', runlevels='boot'))
mock = MagicMock(return_value={'name': ['boot', 'default']})
with patch.object(gentoo_service, 'get_enabled', mock):
# service is enabled at any level
self.assertTrue(gentoo_service.enabled('name'))
# service is enabled at the requested runlevels
self.assertTrue(gentoo_service.enabled('name', runlevels='default'))
# service is enabled at all levels
self.assertTrue(gentoo_service.enabled('name', runlevels=['boot', 'default']))
# service is enabled at a different runlevels
self.assertFalse(gentoo_service.enabled('name', runlevels='some-other-level'))
# service is enabled at a different runlevels
self.assertFalse(gentoo_service.enabled('name', runlevels=['boot', 'some-other-level']))
开发者ID:bryson,项目名称:salt,代码行数:25,代码来源:gentoo_service_test.py
示例14: test_installed
def test_installed(self):
'''
Test to install specified windows updates
'''
ret = {'name': 'salt',
'changes': {},
'result': False,
'comment': ''}
mock = MagicMock(side_effect=[['Saltstack', False, 5],
['Saltstack', True, 5],
['Saltstack', True, 5],
['Saltstack', True, 5]])
with patch.object(win_update, '_search', mock):
ret.update({'comment': 'Saltstack'})
self.assertDictEqual(win_update.installed('salt'), ret)
mock = MagicMock(side_effect=[['dude', False, 5],
['dude', True, 5],
['dude', True, 5]])
with patch.object(win_update, '_download', mock):
ret.update({'comment': 'Saltstackdude'})
self.assertDictEqual(win_update.installed('salt'), ret)
mock = MagicMock(side_effect=[['@Me', False, 5],
['@Me', True, 5]])
with patch.object(win_update, '_install', mock):
ret.update({'comment': '[email protected]'})
self.assertDictEqual(win_update.installed('salt'), ret)
ret.update({'changes': True, 'result': True})
self.assertDictEqual(win_update.installed('salt'), ret)
开发者ID:DaveQB,项目名称:salt,代码行数:32,代码来源:win_update_test.py
示例15: test_user_remove
def test_user_remove(self):
'''
Tests to remove a cluster admin or a database user.
'''
with patch.object(influx, 'user_exists', return_value=False):
self.assertFalse(influx.user_remove(name='A',
user='root',
password='root',
host='localhost',
port=8000))
self.assertFalse(influx.user_remove(name='A',
database='test',
user='root',
password='root',
host='localhost',
port=8000))
mock_inf_db_client = MagicMock(return_value=MockInfluxDBClient())
with patch.object(influx, '_client', mock_inf_db_client):
with patch.object(influx, 'user_exists', return_value=True):
self.assertTrue(influx.user_remove(name='A',
user='root',
password='root',
host='localhost',
port=8000))
self.assertTrue(influx.user_remove(name='A',
database='test',
user='root',
password='root',
host='localhost',
port=8000))
开发者ID:HowardMei,项目名称:saltstack,代码行数:33,代码来源:influx_test.py
示例16: test_latest
def test_latest(self):
'''
Test to Make sure the repository is cloned to
the given directory and is up to date
'''
ret = {'changes': {}, 'comment': '', 'name': 'salt', 'result': True}
mock = MagicMock(return_value=True)
with patch.object(hg, '_fail', mock):
self.assertTrue(hg.latest("salt"))
mock = MagicMock(side_effect=[False, True, False, False, False, False])
with patch.object(os.path, 'isdir', mock):
mock = MagicMock(return_value=True)
with patch.object(hg, '_handle_existing', mock):
self.assertTrue(hg.latest("salt", target="c:\\salt"))
with patch.dict(hg.__opts__, {'test': True}):
mock = MagicMock(return_value=True)
with patch.object(hg, '_neutral_test', mock):
self.assertTrue(hg.latest("salt", target="c:\\salt"))
with patch.dict(hg.__opts__, {'test': False}):
mock = MagicMock(return_value=True)
with patch.object(hg, '_clone_repo', mock):
self.assertDictEqual(hg.latest("salt", target="c:\\salt"),
ret)
开发者ID:bryson,项目名称:salt,代码行数:26,代码来源:hg_test.py
示例17: test_get_disabled
def test_get_disabled(self):
'''
Test to return a list of all disabled services
'''
cmd_mock = MagicMock(return_value=_LIST_UNIT_FILES)
# 'foo' should collide with the systemd services (as returned by
# sd_mock) and thus not be returned by _get_sysv_services(). It doesn't
# matter that it's not part of the _LIST_UNIT_FILES output, we just
# want to ensure that 'foo' isn't identified as a disabled initscript
# even though below we are mocking it to show as not enabled (since
# only 'baz' will be considered an enabled sysv service).
listdir_mock = MagicMock(return_value=['foo', 'bar', 'baz', 'README'])
sd_mock = MagicMock(
return_value=set(
[x.replace('.service', '') for x in _SYSTEMCTL_STATUS]
)
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(
systemd.INITSCRIPT_PATH,
'README'
)
)
sysv_enabled_mock = MagicMock(side_effect=lambda x: x == 'baz')
with patch.dict(systemd.__salt__, {'cmd.run': cmd_mock}):
with patch.object(os, 'listdir', listdir_mock):
with patch.object(systemd, '_get_systemd_services', sd_mock):
with patch.object(os, 'access', side_effect=access_mock):
with patch.object(systemd, '_sysv_enabled',
sysv_enabled_mock):
self.assertListEqual(
systemd.get_disabled(),
['bar', 'service2', 'timer2.timer']
)
开发者ID:bryson,项目名称:salt,代码行数:35,代码来源:systemd_test.py
示例18: test_grains
def test_grains(self):
'''
test cache.grains runner
'''
mock_minion = ['Larry']
mock_ret = {}
self.assertEqual(cache.grains(minion=mock_minion), mock_ret)
mock_data = 'grain stuff'
mock_outputter = 'deprecated'
class MockMaster(object):
def __init__(self, *args, **kwargs):
pass
def get_minion_grains(self):
return mock_data
mock_ret = {'outputter': mock_outputter, 'data': mock_data}
with patch.object(salt.utils.master, 'MasterPillarUtil', MockMaster):
self.assertEqual(cache.grains(outputter=mock_outputter), mock_ret)
mock_outputter = None
with patch.object(salt.utils.master, 'MasterPillarUtil', MockMaster):
self.assertEqual(cache.grains(outputter=mock_outputter), mock_data)
开发者ID:HowardMei,项目名称:saltstack,代码行数:25,代码来源:cache_test.py
示例19: test_get_enabled
def test_get_enabled(self):
'''
Test to return a list of all enabled services
'''
cmd_mock = MagicMock(return_value=_LIST_UNIT_FILES)
listdir_mock = MagicMock(return_value=['foo', 'bar', 'baz', 'README'])
sd_mock = MagicMock(
return_value=set(
[x.replace('.service', '') for x in _SYSTEMCTL_STATUS]
)
)
access_mock = MagicMock(
side_effect=lambda x, y: x != os.path.join(
systemd.INITSCRIPT_PATH,
'README'
)
)
sysv_enabled_mock = MagicMock(side_effect=lambda x: x == 'baz')
with patch.dict(systemd.__salt__, {'cmd.run': cmd_mock}):
with patch.object(os, 'listdir', listdir_mock):
with patch.object(systemd, '_get_systemd_services', sd_mock):
with patch.object(os, 'access', side_effect=access_mock):
with patch.object(systemd, '_sysv_enabled',
sysv_enabled_mock):
self.assertListEqual(
systemd.get_enabled(),
['baz', 'service1', 'timer1.timer']
)
开发者ID:bryson,项目名称:salt,代码行数:29,代码来源:systemd_test.py
示例20: test_set_replication_enabled
def test_set_replication_enabled(self):
'''
Test to sets the master to ignore poll requests from the slaves.
'''
with patch.object(solr, '_is_master', side_effect=[False, True, True,
True]):
with patch.object(solr, '_get_none_or_value',
side_effect=[None, None, True, True, True]):
with patch.object(solr,
'_get_return_dict',
side_effect=[{'A': 'a'}, {}]):
with patch.object(solr, '_replication_request',
return_value='A'):
self.assertDictEqual(solr.set_replication_enabled('s'),
{'A': 'a'})
with patch.object(solr, '_check_for_cores',
return_value=True):
with patch.dict(solr.__opts__,
{'solr.cores':
MagicMock(return_value='n')}):
self.assertEqual(solr.set_replication_enabled
('s'), {})
self.assertEqual(solr.set_replication_enabled('s'), 'A')
self.assertEqual(solr.set_replication_enabled(False),
'A')
开发者ID:DaveQB,项目名称:salt,代码行数:29,代码来源:solr_test.py
注:本文中的salttesting.mock.patch.object函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论