本文整理汇总了Python中salt.utils.event.tagify函数的典型用法代码示例。如果您正苦于以下问题:Python tagify函数的具体用法?Python tagify怎么用?Python tagify使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了tagify函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _proc_function
def _proc_function(self, fun, low, user, tag, jid):
'''
Run this method in a multiprocess target to execute the function in a
multiprocess and fire the return data on the event bus
'''
salt.utils.daemonize()
data = {'fun': '{0}.{1}'.format(self.client, fun),
'jid': jid,
'user': user,
}
event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
self.opts['transport'],
opts=self.opts,
listen=False)
event.fire_event(data, tagify('new', base=tag))
try:
data['return'] = self.low(fun, low)
data['success'] = True
except Exception as exc:
data['return'] = 'Exception occurred in {0} {1}: {2}: {3}'.format(
self.client,
fun,
exc.__class__.__name__,
exc,
)
data['success'] = False
data['user'] = user
event.fire_event(data, tagify('ret', base=tag))
# if we fired an event, make sure to delete the event object.
# This will ensure that we call destroy, which will do the 0MQ linger
del event
开发者ID:wikimedia,项目名称:operations-debs-salt,代码行数:35,代码来源:mixins.py
示例2: _proc_runner
def _proc_runner(self, fun, low, user, tag, jid):
'''
Run this method in a multiprocess target to execute the runner in a
multiprocess and fire the return data on the event bus
'''
salt.utils.daemonize()
event = salt.utils.event.MasterEvent(self.opts['sock_dir'])
data = {'fun': 'runner.{0}'.format(fun),
'jid': jid,
'user': user,
}
event.fire_event(data, tagify('new', base=tag))
try:
data['return'] = self.low(fun, low)
data['success'] = True
except Exception as exc:
data['return'] = 'Exception occured in runner {0}: {1}: {2}'.format(
fun,
exc.__class__.__name__,
exc,
)
data['success'] = False
data['user'] = user
event.fire_event(data, tagify('ret', base=tag))
# this is a workaround because process reaping is defeating 0MQ linger
time.sleep(2.0) # delay so 0MQ event gets out before runner process reaped
开发者ID:Anbcorp,项目名称:salt,代码行数:27,代码来源:runner.py
示例3: _disbatch_local
def _disbatch_local(self, chunk):
'''
Disbatch local client commands
'''
chunk_ret = {}
f_call = salt.utils.format_call(self.saltclients['local'], chunk)
# fire a job off
try:
ping_pub_data = self.saltclients['local'](chunk['tgt'],
'test.ping',
[],
expr_form=f_call['kwargs']['expr_form'])
pub_data = self.saltclients['local'](*f_call.get('args', ()), **f_call.get('kwargs', {}))
except EauthAuthenticationError:
raise tornado.gen.Return('Not authorized to run this job')
# if the job didn't publish, lets not wait around for nothing
# TODO: set header??
if 'jid' not in pub_data:
raise tornado.gen.Return('No minions matched the target. No command was sent, no jid was assigned.')
# get the tag that we are looking for
ping_tag = tagify([ping_pub_data['jid'], 'ret'], 'job')
ret_tag = tagify([pub_data['jid'], 'ret'], 'job')
# seed minions_remaining with the pub_data
minions_remaining = pub_data['minions']
ret_event = self.application.event_listener.get_event(self, tag=ret_tag)
ping_event = self.application.event_listener.get_event(self, tag=ping_tag)
# while we are waiting on all the mininons
while len(minions_remaining) > 0 or not self.min_syndic_wait_done():
event_future = yield Any([ret_event, ping_event])
try:
event = event_future.result()
# if you hit a timeout, just stop waiting ;)
except TimeoutException:
break
# If someone returned from the ping, and they are new-- add to minions_remaining
if event_future == ping_event:
ping_id = event['data']['id']
if ping_id not in chunk_ret and ping_id not in minions_remaining:
minions_remaining.append(ping_id)
ping_event = self.application.event_listener.get_event(self, tag=ping_tag)
# if it is a ret future, its just a regular return
else:
chunk_ret[event['data']['id']] = event['data']['return']
# its possible to get a return that wasn't in the minion_remaining list
try:
minions_remaining.remove(event['data']['id'])
except ValueError:
pass
ret_event = self.application.event_listener.get_event(self, tag=ret_tag)
raise tornado.gen.Return(chunk_ret)
开发者ID:DavideyLee,项目名称:salt,代码行数:57,代码来源:saltnado.py
示例4: testMinionStatsWrongMissingTag
def testMinionStatsWrongMissingTag(self):
"""
Test Minion Stats requests with unknown and missing tag (A3, A4)
"""
console.terse("{0}\n".format(self.testMinionStatsWrongMissingTag.__doc__))
# Bootstrap
self.addEnterDeed("TestOptsSetupMinion")
self.addEnterDeed("SaltRaetManorLaneSetup")
self.addEnterDeed("SaltRaetRoadStackSetup")
self.addEnterDeed("StatsMinionTestSetup")
act = self.addRecurDeed("SaltRaetStatsEventerMinion")
self.resolve() # resolve House, Framer, Frame, Acts, Actors
self.frame.enter()
# Prepare
# add a test stat key-value
roadStack = self.store.fetch('.salt.road.manor.stack')
laneStack = self.store.fetch('.salt.lane.manor.stack')
roadStack.value.stats = odict({'test_road_stats_event': 111})
laneStack.value.stats = odict({'test_lane_stats_event': 222})
# ensure stats are equal to expected
self.assertDictEqual(roadStack.value.stats, {'test_road_stats_event': 111})
self.assertDictEqual(laneStack.value.stats, {'test_lane_stats_event': 222})
# add stats request
testStack = self.store.fetch('.salt.test.road.stack').value
statsReq = self.store.fetch('.salt.stats.event_req').value
tag = 'salt/unknown/tag'
self.assertNotEqual(tag, tagify('lane', 'stats'))
self.assertNotEqual(tag, tagify('road', 'stats'))
minionName = roadStack.value.local.name
masterName = testStack.local.name
# unknown tag in stats request
statsReq.append({'route': {'dst': (minionName, None, 'stats_req'),
'src': (masterName, None, None)},
'tag': tag})
# no tag in stats request
statsReq.append({'route': {'dst': (minionName, None, 'stats_req'),
'src': (masterName, None, None)}})
# Test
self.frame.recur() # run in frame
# Check
self.assertEqual(len(testStack.rxMsgs), 0)
testStack.serviceAll()
self.assertEqual(len(testStack.rxMsgs), 0)
# Close active stacks servers
act.actor.lane_stack.value.server.close()
act.actor.road_stack.value.server.close()
testStack = self.store.fetch('.salt.test.road.stack')
if testStack:
testStack.value.server.close()
开发者ID:DaveQB,项目名称:salt,代码行数:55,代码来源:test_stats.py
示例5: all_returns
def all_returns(self,
jid,
finish_futures=None,
minions_remaining=None,
):
'''
Return a future which will complete once all returns are completed
(according to minions_remaining), or one of the passed in "finish_futures" completes
'''
if finish_futures is None:
finish_futures = []
if minions_remaining is None:
minions_remaining = []
ret_tag = tagify([jid, 'ret'], 'job')
chunk_ret = {}
while True:
ret_event = self.application.event_listener.get_event(self,
tag=ret_tag,
)
f = yield Any([ret_event] + finish_futures)
if f in finish_futures:
raise tornado.gen.Return(chunk_ret)
event = f.result()
chunk_ret[event['data']['id']] = event['data']['return']
# its possible to get a return that wasn't in the minion_remaining list
try:
minions_remaining.remove(event['data']['id'])
except ValueError:
pass
if len(minions_remaining) == 0:
raise tornado.gen.Return(chunk_ret)
开发者ID:wikimedia,项目名称:operations-debs-salt,代码行数:32,代码来源:saltnado.py
示例6: update
def update(self):
"""
COPIED FROM SALT
changed: salt.utils.fopen() call opens the file in binary mode instead.
"""
# data for the fileserver event
data = {"changed": self.clear_old_remotes(), "backend": "gitfs"}
if self.fetch_remotes():
data["changed"] = True
if data["changed"] is True or not os.path.isfile(self.env_cache):
env_cachedir = os.path.dirname(self.env_cache)
if not os.path.exists(env_cachedir):
os.makedirs(env_cachedir)
new_envs = self.envs(ignore_cache=True)
serial = salt.payload.Serial(self.opts)
with salt.utils.fopen(self.env_cache, "wb+") as fp_:
fp_.write(serial.dumps(new_envs))
logger.trace("Wrote env cache data to {0}".format(self.env_cache))
# if there is a change, fire an event
if self.opts.get("fileserver_events", False):
event = salt.utils.event.get_event(
"master", self.opts["sock_dir"], self.opts["transport"], opts=self.opts, listen=False
)
event.fire_event(data, tagify(["gitfs", "update"], prefix="fileserver"))
try:
salt.fileserver.reap_fileserver_cache_dir(self.hash_cachedir, self.find_file)
except (OSError, IOError):
# Hash file won't exist if no files have yet been served up
pass
开发者ID:clarkperkins,项目名称:stackdio,代码行数:33,代码来源:utils.py
示例7: get_stats
def get_stats(estate=None, stack='road'):
'''
Print the stack stats
estate : None
The name of the target estate. Master stats would be requested by default
stack : 'road'
Show stats on either road or lane stack
Allowed values are 'road' or 'lane'.
CLI Example:
.. code-block:: bash
salt-run manage.get_stats [estate=alpha_minion] [stack=lane]
'''
conf_file = __opts__['conf_file']
opts = salt.config.client_config(conf_file)
if opts['transport'] == 'raet':
tag = tagify(stack, 'stats')
event = salt.utils.raetevent.StatsEvent(__opts__, __opts__['sock_dir'], tag=tag, estate=estate)
stats = event.get_event(wait=60, tag=tag)
else:
#TODO: implement 0MQ analog
stats = 'Not implemented'
return stats
开发者ID:dmyerscough,项目名称:salt,代码行数:28,代码来源:manage.py
示例8: update
def update():
'''
Execute an hg pull on all of the repos
'''
# data for the fileserver event
data = {'changed': False,
'backend': 'hgfs'}
pid = os.getpid()
data['changed'] = purge_cache()
for repo in init():
repo['repo'].open()
lk_fn = os.path.join(repo['repo'].root(), 'update.lk')
with salt.utils.fopen(lk_fn, 'w+') as fp_:
fp_.write(str(pid))
curtip = repo['repo'].tip()
try:
repo['repo'].pull()
except Exception as exc:
log.error(
'Exception {0} caught while updating hgfs remote {1}'
.format(exc, repo['uri']),
exc_info=log.isEnabledFor(logging.DEBUG)
)
else:
newtip = repo['repo'].tip()
if curtip[1] != newtip[1]:
data['changed'] = True
repo['repo'].close()
try:
os.remove(lk_fn)
except (IOError, OSError):
pass
env_cache = os.path.join(__opts__['cachedir'], 'hgfs/envs.p')
if data.get('changed', False) is True or not os.path.isfile(env_cache):
env_cachedir = os.path.dirname(env_cache)
if not os.path.exists(env_cachedir):
os.makedirs(env_cachedir)
new_envs = envs(ignore_cache=True)
serial = salt.payload.Serial(__opts__)
with salt.utils.fopen(env_cache, 'w+') as fp_:
fp_.write(serial.dumps(new_envs))
log.trace('Wrote env cache data to {0}'.format(env_cache))
# if there is a change, fire an event
if __opts__.get('fileserver_events', False):
event = salt.utils.event.get_event(
'master',
__opts__['sock_dir'],
__opts__['transport'],
listen=False)
event.fire_event(data, tagify(['hgfs', 'update'], prefix='fileserver'))
try:
salt.fileserver.reap_fileserver_cache_dir(
os.path.join(__opts__['cachedir'], 'hgfs/hash'),
find_file
)
except (IOError, OSError):
# Hash file won't exist if no files have yet been served up
pass
开发者ID:AccelerationNet,项目名称:salt,代码行数:60,代码来源:hgfs.py
示例9: reject
def reject(self, match):
'''
Reject a specified host's public key or keys based on a glob
'''
matches = self.name_match(match)
if 'minions_pre' in matches:
for key in matches['minions_pre']:
try:
shutil.move(
os.path.join(
self.opts['pki_dir'],
'minions_pre',
key),
os.path.join(
self.opts['pki_dir'],
'minions_rejected',
key)
)
eload = {'result': True,
'act': 'reject',
'id': key}
self.event.fire_event(eload, tagify(prefix='key'))
except (IOError, OSError):
pass
self.check_minion_cache()
salt.crypt.dropfile(self.opts['cachedir'], self.opts['user'])
return self.name_match(match)
开发者ID:jslatts,项目名称:salt,代码行数:27,代码来源:key.py
示例10: accept
def accept(self, match):
'''
Accept a specified host's public key based on name or keys based on
glob
'''
matches = self.name_match(match)
if 'minions_pre' in matches:
for key in matches['minions_pre']:
try:
shutil.move(
os.path.join(
self.opts['pki_dir'],
'minions_pre',
key),
os.path.join(
self.opts['pki_dir'],
'minions',
key)
)
eload = {'result': True,
'act': 'accept',
'id': key}
self.event.fire_event(eload, tagify(prefix='key'))
except (IOError, OSError):
pass
return self.name_match(match)
开发者ID:jslatts,项目名称:salt,代码行数:26,代码来源:key.py
示例11: cmd_sync
def cmd_sync(self, low, timeout=None):
'''
Execute a runner function synchronously; eauth is respected
This function requires that :conf_master:`external_auth` is configured
and the user is authorized to execute runner functions: (``@runner``).
.. code-block:: python
runner.eauth_sync({
'fun': 'jobs.list_jobs',
'username': 'saltdev',
'password': 'saltdev',
'eauth': 'pam',
})
'''
reformatted_low = self._reformat_low(low)
job = self.master_call(**reformatted_low)
ret_tag = tagify('ret', base=job['tag'])
timelimit = time.time() + (timeout or 300)
while True:
ret = self.event.get_event(full=True)
if ret is None:
if time.time() > timelimit:
raise salt.exceptions.SaltClientTimeout(
"RunnerClient job '{0}' timed out".format(job['jid']),
jid=job['jid'])
else:
continue
if ret['tag'] == ret_tag:
return ret['data']['return']
开发者ID:wikimedia,项目名称:operations-debs-salt,代码行数:33,代码来源:runner.py
示例12: reject_all
def reject_all(self):
'''
Reject all keys in pre
'''
keys = self.list_keys()
for key in keys['minions_pre']:
try:
shutil.move(
os.path.join(
self.opts['pki_dir'],
'minions_pre',
key),
os.path.join(
self.opts['pki_dir'],
'minions_rejected',
key)
)
eload = {'result': True,
'act': 'reject',
'id': key}
self.event.fire_event(eload, tagify(prefix='key'))
except (IOError, OSError):
pass
self.check_minion_cache()
salt.crypt.dropfile(self.opts['cachedir'], self.opts['user'])
return self.list_keys()
开发者ID:jslatts,项目名称:salt,代码行数:26,代码来源:key.py
示例13: get_stats
def get_stats(estate=None, stack="road"):
"""
Print the stack stats
estate : None
The name of the target estate. Master stats would be requested by default
stack : 'road'
Show stats on either road or lane stack
Allowed values are 'road' or 'lane'.
CLI Example:
.. code-block:: bash
salt-run manage.get_stats [estate=alpha_minion] [stack=lane]
"""
conf_file = __opts__["conf_file"]
opts = salt.config.client_config(conf_file)
if opts["transport"] == "raet":
tag = tagify(stack, "stats")
event = salt.utils.raetevent.StatsEvent(__opts__, __opts__["sock_dir"], tag=tag, estate=estate)
stats = event.get_event(wait=60, tag=tag)
else:
# TODO: implement 0MQ analog
stats = "Not implemented"
return stats
开发者ID:DaveQB,项目名称:salt,代码行数:28,代码来源:manage.py
示例14: process_queue
def process_queue(queue, quantity=1, backend='sqlite'):
'''
Pop items off a queue and create an event on the Salt event bus to be
processed by a Reactor.
CLI Example:
.. code-block:: bash
salt-run queue.process_queue myqueue
salt-run queue.process_queue myqueue 6
salt-run queue.process_queue myqueue all backend=sqlite
'''
# get ready to send an event
event = salt.utils.event.get_event(
'master',
__opts__['sock_dir'],
__opts__['transport'],
opts=__opts__,
listen=False)
try:
items = pop(queue=queue, quantity=quantity, backend=backend)
except SaltInvocationError as exc:
error_txt = '{0}'.format(exc)
__progress__(error_txt)
return False
data = {'items': items,
'backend': backend,
'queue': queue,
}
event.fire_event(data, tagify([queue, 'process'], prefix='queue'))
开发者ID:DavideyLee,项目名称:salt,代码行数:32,代码来源:queue.py
示例15: delete_key
def delete_key(self, match=None, match_dict=None):
'''
Delete public keys. If "match" is passed, it is evaluated as a glob.
Pre-gathered matches can also be passed via "match_dict".
'''
if match is not None:
matches = self.name_match(match)
elif match_dict is not None and isinstance(match_dict, dict):
matches = match_dict
else:
matches = {}
for status, keys in matches.items():
for key in keys:
try:
os.remove(os.path.join(self.opts['pki_dir'], status, key))
eload = {'result': True,
'act': 'delete',
'id': key}
self.event.fire_event(eload, tagify(prefix='key'))
except (OSError, IOError):
pass
self.check_minion_cache()
salt.crypt.dropfile(self.opts['cachedir'], self.opts['user'])
return (
self.name_match(match) if match is not None
else self.dict_match(matches)
)
开发者ID:1mentat,项目名称:salt,代码行数:27,代码来源:key.py
示例16: update
def update():
'''
Execute an svn update on all of the repos
'''
# data for the fileserver event
data = {'changed': False,
'backend': 'svnfs'}
pid = os.getpid()
data['changed'] = purge_cache()
for repo in init():
lk_fn = os.path.join(repo['repo'], 'update.lk')
with salt.utils.fopen(lk_fn, 'w+') as fp_:
fp_.write(str(pid))
old_rev = _rev(repo)
try:
CLIENT.update(repo['repo'])
except pysvn._pysvn.ClientError as exc:
log.error(
'Error updating svnfs remote {0} (cachedir: {1}): {2}'
.format(repo['uri'], repo['cachedir'], exc)
)
try:
os.remove(lk_fn)
except (OSError, IOError):
pass
new_rev = _rev(repo)
if any((x is None for x in (old_rev, new_rev))):
# There were problems getting the revision ID
continue
if new_rev != old_rev:
data['changed'] = True
env_cache = os.path.join(__opts__['cachedir'], 'svnfs/envs.p')
if data.get('changed', False) is True or not os.path.isfile(env_cache):
env_cachedir = os.path.dirname(env_cache)
if not os.path.exists(env_cachedir):
os.makedirs(env_cachedir)
new_envs = envs(ignore_cache=True)
serial = salt.payload.Serial(__opts__)
with salt.utils.fopen(env_cache, 'w+') as fp_:
fp_.write(serial.dumps(new_envs))
log.trace('Wrote env cache data to {0}'.format(env_cache))
# if there is a change, fire an event
if __opts__.get('fileserver_events', False):
event = salt.utils.event.get_event(
'master',
__opts__['sock_dir'],
__opts__['transport'],
listen=False)
event.fire_event(data, tagify(['svnfs', 'update'], prefix='fileserver'))
try:
salt.fileserver.reap_fileserver_cache_dir(
os.path.join(__opts__['cachedir'], 'svnfs/hash'),
find_file
)
except (IOError, OSError):
# Hash file won't exist if no files have yet been served up
pass
开发者ID:AccelerationNet,项目名称:salt,代码行数:60,代码来源:svnfs.py
示例17: fire_event
def fire_event(self, data, tag):
'''
fires event with data and tag
This only works if api is running with same user permissions as master
Need to convert this to a master call with appropriate authentication
'''
return self.event.fire_event(data, tagify(tag, 'wui'))
开发者ID:AccelerationNet,项目名称:salt,代码行数:8,代码来源:api.py
示例18: update
def update():
'''
Execute a git pull on all of the repos
'''
# data for the fileserver event
data = {'changed': False,
'backend': 'gitfs'}
provider = _get_provider()
pid = os.getpid()
data['changed'] = purge_cache()
repos = init()
for repo in repos:
origin = repo.remotes[0]
if provider == 'gitpython':
working_dir = repo.working_dir
elif provider == 'pygit2':
working_dir = repo.workdir
lk_fn = os.path.join(working_dir, 'update.lk')
with salt.utils.fopen(lk_fn, 'w+') as fp_:
fp_.write(str(pid))
try:
if provider == 'gitpython':
for fetch in origin.fetch():
if fetch.old_commit is not None:
data['changed'] = True
elif provider == 'pygit2':
fetch = origin.fetch()
if fetch.get('received_objects', 0):
data['changed'] = True
except Exception as exc:
log.warning(
'Exception caught while fetching: {0}'.format(exc)
)
try:
os.remove(lk_fn)
except (IOError, OSError):
pass
env_cache = os.path.join(__opts__['cachedir'], 'gitfs/envs.p')
if data.get('changed', False) is True or not os.path.isfile(env_cache):
new_envs = envs(ignore_cache=True)
serial = salt.payload.Serial(__opts__)
with salt.utils.fopen(env_cache, 'w+') as fp_:
fp_.write(serial.dumps(new_envs))
log.trace('Wrote env cache data to {0}'.format(env_cache))
# if there is a change, fire an event
if __opts__.get('fileserver_events', False):
event = salt.utils.event.MasterEvent(__opts__['sock_dir'])
event.fire_event(data, tagify(['gitfs', 'update'], prefix='fileserver'))
try:
salt.fileserver.reap_fileserver_cache_dir(
os.path.join(__opts__['cachedir'], 'gitfs/hash'),
find_file
)
except (IOError, OSError):
# Hash file won't exist if no files have yet been served up
pass
开发者ID:penta-srl,项目名称:salt,代码行数:58,代码来源:gitfs.py
示例19: job_not_running
def job_not_running(self,
jid,
tgt,
tgt_type,
minions_remaining=None,
):
'''
Return a future which will complete once jid (passed in) is no longer
running on tgt
'''
if minions_remaining is None:
minions_remaining = []
ping_pub_data = self.saltclients['local'](tgt,
'saltutil.find_job',
[jid],
expr_form=tgt_type)
ping_tag = tagify([ping_pub_data['jid'], 'ret'], 'job')
minion_running = False
while True:
try:
event = yield self.application.event_listener.get_event(self,
tag=ping_tag,
timeout=self.application.opts['gather_job_timeout'],
)
except TimeoutException:
if not minion_running:
raise tornado.gen.Return(True)
else:
ping_pub_data = self.saltclients['local'](tgt,
'saltutil.find_job',
[jid],
expr_form=tgt_type)
ping_tag = tagify([ping_pub_data['jid'], 'ret'], 'job')
minion_running = False
continue
# Minions can return, we want to see if the job is running...
if event['data'].get('return', {}) == {}:
continue
minion_running = True
id_ = event['data']['id']
if id_ not in minions_remaining:
minions_remaining.append(event['data']['id'])
开发者ID:wikimedia,项目名称:operations-debs-salt,代码行数:44,代码来源:saltnado.py
示例20: testMinionLaneStats
def testMinionLaneStats(self):
"""
Test Minion Road Stats request (A2)
"""
console.terse("{0}\n".format(self.testMinionLaneStats.__doc__))
# Bootstrap
self.addEnterDeed("TestOptsSetupMinion")
self.addEnterDeed("SaltRaetManorLaneSetup")
self.addEnterDeed("SaltRaetRoadStackSetup")
self.addEnterDeed("StatsMinionTestSetup")
act = self.addRecurDeed("SaltRaetStatsEventerMinion")
self.resolve() # resolve House, Framer, Frame, Acts, Actors
self.frame.enter()
# Prepare
# add a test stat key-value
roadStack = self.store.fetch('.salt.road.manor.stack')
laneStack = self.store.fetch('.salt.lane.manor.stack')
roadStack.value.stats = odict()
laneStack.value.stats = odict({'test_stats_event': 111})
# ensure stats are equal to expected
self.assertDictEqual(roadStack.value.stats, {})
self.assertDictEqual(laneStack.value.stats, {'test_stats_event': 111})
# add stats request
testStack = self.store.fetch('.salt.test.road.stack').value
statsReq = self.store.fetch('.salt.stats.event_req').value
tag = tagify('lane', 'stats')
minionName = roadStack.value.local.name
masterName = testStack.local.name
# lane stats request
statsReq.append({'route': {'dst': (minionName, None, 'stats_req'),
'src': (masterName, None, None)},
'tag': tag})
# Test
self.frame.recur() # run in frame
# Check
self.assertEqual(len(testStack.rxMsgs), 0)
testStack.serviceAll()
self.assertEqual(len(testStack.rxMsgs), 1)
msg, sender = testStack.rxMsgs.popleft()
self.assertDictEqual(msg, {u'route': {u'src': [ns2u(minionName), u'manor', None],
u'dst': [ns2u(masterName), None, u'event_fire']},
u'tag': ns2u(tag),
u'data': {u'test_stats_event': 111}})
# Close active stacks servers
act.actor.lane_stack.value.server.close()
act.actor.road_stack.value.server.close()
testStack = self.store.fetch('.salt.test.road.stack')
if testStack:
testStack.value.server.close()
开发者ID:DaveQB,项目名称:salt,代码行数:56,代码来源:test_stats.py
注:本文中的salt.utils.event.tagify函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论