本文整理汇总了Python中stubo.model.db.Scenario类的典型用法代码示例。如果您正苦于以下问题:Python Scenario类的具体用法?Python Scenario怎么用?Python Scenario使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Scenario类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_put_stub
def test_put_stub(self):
self.http_client.fetch(self.get_url('/stubo/api/begin/session?scenario=xslt&session=s1&mode=record'), self.stop)
response = self.wait()
self.assertEqual(response.code, 200)
self.http_client.fetch(self.get_url('/stubo/api/put/module?name=/static/cmds/tests/ext/xslt/mangler.py'),
self.stop)
response = self.wait()
self.assertEqual(response.code, 200)
self.http_client.fetch(self.get_url('/stubo/api/put/stub?session=s1&ext_module=mangler'),
callback=self.stop,
method="POST",
body="""||textMatcher||<request><a>ba</a><trans_id>1234</trans_id><b>ba</b><user uuid="xxx">joe</user><dt>2013-11-25</dt></request>||response||<response><a>ba</a><trans_id>1234</trans_id><b>ba</b><user uuid="xxx">joe</user><dt>2013-11-25</dt></response>""")
response = self.wait()
self.assertEqual(response.code, 200)
from stubo.model.db import Scenario
scenario_db = Scenario(db=self.db)
stubs = list(scenario_db.get_stubs('localhost:xslt'))
self.assertEqual(len(stubs), 1)
from stubo.model.stub import Stub
stub = Stub(stubs[0]['stub'], 'localhost:xslt')
self.assertEqual(stub.contains_matchers()[0],
u'<request><a>ba</a><trans_id>***</trans_id><b>ba</b><user uuid="***">***</user><dt>***</dt></request>\n')
self.assertEqual(stub.response_body()[0],
u'<response><a>ba</a><trans_id>***</trans_id><b>ba</b><user uuid="***">***</user><dt>***</dt></response>\n')
from datetime import date
self.assertEqual(stub.module(), {
u'system_date': str(date.today()),
u'recorded_system_date': str(date.today()),
u'name': u'mangler'})
开发者ID:ocoperations,项目名称:stubo-app,代码行数:30,代码来源:test_ext.py
示例2: store_source_recording
def store_source_recording(scenario_name_key, record_session):
host, scenario_name = scenario_name_key.split(':')
# use original put/stub payload logged in tracker
tracker = Tracker()
last_used = tracker.session_last_used(scenario_name_key,
record_session, 'record')
if not last_used:
# empty recordings are currently supported!
log.debug('Unable to find a recording for session={0}, scenario={1}'.format(record_session, scenario_name_key))
return
recording = tracker.get_last_recording(scenario_name, record_session,
last_used['start_time'])
recording = list(recording)
if not recording:
raise exception_response(400,
title="Unable to find a recording for scenario='{0}', record_session='{1}'".format(scenario_name, record_session))
number_of_requests = len(recording)
scenario_db = Scenario()
for nrequest in range(number_of_requests):
track = recording[nrequest]
request_text = track.get('request_text')
if not request_text:
raise exception_response(400, title='Unable to obtain recording details, was full tracking enabled?')
priority = int(track['request_params'].get('priority', nrequest+1))
stub = parse_stub(request_text, scenario_name_key,
track['request_params'])
stub.set_priority(priority)
scenario_db.insert_pre_stub(scenario_name_key, stub)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:31,代码来源:api.py
示例3: get_all
def get_all(self, name=None):
if name and isinstance(name, dict) and '$regex' in name:
# {'$regex': 'localhost:.*'}
name = name['$regex'].partition(':')[0]
all_names = list(Scenario.get_all(self))
return [self.get(x.get('name')) for x in all_names if name in x.get('name')]
else:
return Scenario.get_all(self, name=name)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:8,代码来源:testing.py
示例4: list_stubs
def list_stubs(handler, scenario_name, host=None):
cache = Cache(host or get_hostname(handler.request))
scenario = Scenario()
stubs = scenario.get_stubs(cache.scenario_key_name(scenario_name))
result = dict(version=version, data=dict(scenario=scenario_name))
if stubs:
result['data']['stubs'] = [x['stub'] for x in stubs]
return result
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py
示例5: get_stubs
def get_stubs(host, scenario_name=None):
if not scenario_name:
# get all stubs for this host
scenario_name_key = {'$regex': '{0}:.*'.format(host)}
else:
scenario_name_key = ":".join([host, scenario_name])
scenario = Scenario()
return scenario.get_stubs(scenario_name_key)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py
示例6: rename_scenario
def rename_scenario(handler, scenario_name, new_name):
"""
Renames specified scenario, renames Stubs, reloads cache
:param handler: TrackRequest handler
:param scenario_name: <string> scenario name
:param new_name: <string> new scenario name
:return: <tuple> containing status code and message that will be returned
"""
response = {
'version': version
}
scenario = Scenario()
# getting hostname
host = handler.get_argument('host', get_hostname(handler.request))
# full names hostname:scenario_name
full_scenario_name = "{0}:{1}".format(host, scenario_name)
new_full_scenario_name = "{0}:{1}".format(host, new_name)
# getting scenario object
scenario_obj = scenario.get(full_scenario_name)
# checking if scenario exist, if not - quit
if scenario_obj is None:
handler.set_status(400)
handler.track.scenario = scenario_name
response['error'] = "Scenario not found. Name provided: {0}, host checked: {1}.".format(scenario_name, host)
log.debug("Scenario not found. Name provided: {0}, host checked: {1}.".format(scenario_name, host))
return response
# renaming scenario and all stubs, getting a dict with results
try:
response = scenario.change_name(full_scenario_name, new_full_scenario_name)
except Exception as ex:
handler.set_status()
log.debug("Failed to change scenario name, got error: %s" % ex)
response['error']['database'] = "Failed to change scenario name, got error: %s" % ex
try:
cache = Cache(host)
# change cache
scenario_sessions = cache.get_sessions_status(scenario_name)
# scenario sessions contains tuples [(u'myscenario_session2_1', u'dormant'), ....]
session_info = []
cache.delete_caches(scenario_name)
# rebuild cache
for session_name, mode in scenario_sessions:
cache.create_session_cache(new_name, session_name)
session_info.append({'name': session_name})
# sessions after creation go into playback mode, ending them
end_session(handler, session_name)
response['Remapped sessions'] = session_info
except Exception as ex:
log.debug("Failed to repopulate cache, got error: %s" % ex)
response['error']['cache'] = "Failed to repopulate cache, got error: %s" % ex
return response
开发者ID:rusenask,项目名称:mirage,代码行数:57,代码来源:handlers_mt.py
示例7: get_session_status
def get_session_status(handler, all_hosts=True):
scenario = Scenario()
host_scenarios = defaultdict()
# getting a dictionary with sizes for all scenarios
scenario_sizes = scenario.size()
scenarios_recorded = scenario.recorded()
for s in scenario.get_all():
host, scenario_name = s['name'].split(':')
if not all_hosts and get_hostname(handler.request) != host:
continue
if host not in host_scenarios:
host_scenarios[host] = {}
# getting session data
sessions = []
cache = Cache(host)
for session_name, session in cache.get_sessions(scenario_name):
# try and get the last_used from the last tracker get/response
# else when the begin/session playback was called
last_used = session_last_used(s['name'], session_name, 'playback')
if last_used:
last_used = last_used['start_time'].strftime('%Y-%m-%d %H:%M:%S')
else:
# session has never been used for playback
last_used = session.get('last_used', '-')
session['last_used'] = last_used
# removing stub information since we aren't using it anyway and it can consume a lot of memory
session.pop('stubs', None)
# creating sessions list
sessions.append(session)
# getting stub count
stub_counts = stub_count(host, scenario_name)['data']['count']
recorded = '-'
# adding session information
if sessions:
if stub_counts:
# getting scenario size and recorded values
scenario_size = 0
try:
scenario_size = scenario_sizes[s['name']]
recorded = scenarios_recorded[s['name']]
except KeyError:
log.debug("Could not get scenario size for: %s" % s['name'])
except Exception as ex:
log.warn("Failed to get scenario size for: %s, got error: %s" % (s['name'], ex))
# creating a dict with scenario information
host_scenarios[host][scenario_name] = (sessions, stub_counts, recorded, round(scenario_size, 0))
else:
host_scenarios[host][scenario_name] = (sessions, 0, '-', 0)
return host_scenarios
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:56,代码来源:api.py
示例8: list_scenarios
def list_scenarios(host):
response = {'version' : version}
scenario_db = Scenario()
if host == 'all':
scenarios = [x['name'] for x in scenario_db.get_all()]
else:
# get all scenarios for host
scenarios = [x['name'] for x in scenario_db.get_all(
{'$regex': '{0}:.*'.format(host)})]
response['data'] = dict(host=host, scenarios=scenarios)
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:11,代码来源:api.py
示例9: get
def get(self, scenario_name):
"""
Returns scenario name, current sessions and their states,
stub count, total, document size. Also, provides direct URL link to stub list.
:param scenario_name: <string> scenario name
Response JSON:
{
"stubs": 32,
"space_used_kb": 840,
"recorded": "2015-07-15",
"name": "localhost:scenario_16",
"scenarioRef": "/stubo/api/v2/scenarios/objects/localhost:scenario_16"
}
"""
# check if hostname is supplied - if not, override scenario name with new value
scenario_name = _get_scenario_full_name(self, scenario_name)
# query MongoDB
document = yield self.db.scenario.find_one({'name': scenario_name})
# form a result dictionary
if document is not None:
# get stub count
stub_count = yield self.db.scenario_stub.find({'scenario': scenario_name}).count()
# get size
scenario_cl = Scenario()
size = scenario_cl.size(scenario_name)
# check if size is None
if size is None:
size = 0
recorded = scenario_cl.recorded(scenario_name)
if recorded is None:
recorded = '-'
host, scenario = scenario_name.split(':')
# getting session data
sessions = []
cache = Cache(host)
for session_info in cache.get_scenario_sessions_information(scenario):
sessions.append(session_info)
result_dict = {'name': scenario_name,
'stub_count': stub_count,
'recorded': recorded,
'space_used_kb': int(size),
'scenarioRef': '/stubo/api/v2/scenarios/objects/{0}'.format(scenario_name),
'sessions': sessions}
self.set_status(200)
self.write(result_dict)
else:
self.send_error(404)
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:53,代码来源:handlers.py
示例10: delete_stubs
def delete_stubs(handler, scenario_name=None, host=None, force=False):
"""delete all data relating to one named scenario or host/s."""
log.debug('delete_stubs')
response = {
'version': version
}
scenario_db = Scenario()
static_dir = handler.settings['static_path']
def delete_scenario(sce_name_key, frc):
log.debug(u'delete_scenario: {0}'.format(sce_name_key))
# getting host and scenario names
hst, sce_name = sce_name_key.split(':')
cache = Cache(hst)
if not frc:
active_sessions = cache.get_active_sessions(sce_name,
local=False)
if active_sessions:
raise exception_response(400,
title='Sessons in playback/record, can not delete. '
'Found the following active sessions: {0} '
'for scenario: {1}'.format(active_sessions, sce_name))
scenario_db.remove_all(sce_name_key)
cache.delete_caches(sce_name)
scenarios = []
if scenario_name:
# if scenario_name exists it takes priority
handler.track.scenario = scenario_name
hostname = host or get_hostname(handler.request)
scenarios.append(':'.join([hostname, scenario_name]))
elif host:
if host == 'all':
scenarios = [x['name'] for x in scenario_db.get_all()]
export_dir = os.path.join(static_dir, 'exports')
if os.path.exists(export_dir):
log.info('delete export dir')
shutil.rmtree(export_dir)
else:
# get all scenarios for host
scenarios = [x['name'] for x in scenario_db.get_all(
{'$regex': '{0}:.*'.format(host)})]
else:
raise exception_response(400,
title='scenario or host argument required')
for scenario_name_key in scenarios:
delete_scenario(scenario_name_key, force)
response['data'] = dict(message='stubs deleted.', scenarios=scenarios)
return response
开发者ID:rusenask,项目名称:mirage,代码行数:51,代码来源:api.py
示例11: get_stubs
def get_stubs(self, name=None):
if name and isinstance(name, dict) and '$regex' in name:
name = name['$regex'].partition(':')[0]
all_names = list(self.db.scenario_stub.find({}))
result = [x for x in all_names if name in x.get('scenario')]
else:
result = Scenario.get_stubs(self, name=name)
return result
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:8,代码来源:testing.py
示例12: stub_count
def stub_count(host, scenario_name=None):
if host == 'all':
scenario_name_key = None
else:
if not scenario_name:
# get all stubs for this host
value = '{0}:.*'.format(host)
scenario_name_key = {'$regex': value}
else:
scenario_name_key = ":".join([host, scenario_name])
scenario = Scenario()
result = {'version' : version}
count = scenario.stub_count(scenario_name_key)
result['data'] = {'count' : count,
'scenario' : scenario_name or 'all',
'host' : host}
return result
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:17,代码来源:api.py
示例13: get_session_status
def get_session_status(handler, all_hosts=True):
scenario = Scenario()
host_scenarios = {}
for s in scenario.get_all():
host, scenario_name = s['name'].split(':')
if not all_hosts and get_hostname(handler.request) != host:
continue
if host not in host_scenarios:
host_scenarios[host] = {}
sessions = []
cache = Cache(host)
for session_name, session in cache.get_sessions(scenario_name):
# try and get the last_used from the last tracker get/response
# else when the begin/session playback was called
last_used = session_last_used(s['name'], session_name, 'playback')
if last_used:
last_used = last_used['start_time'].strftime('%Y-%m-%d %H:%M:%S')
else:
# session has never been used for playback
last_used = session.get('last_used', '-')
session['last_used'] = last_used
sessions.append(session)
stub_counts = stub_count(host, scenario_name)['data']['count']
recorded = '-'
space_used = 0
if sessions:
if stub_counts:
stubs = list(get_stubs(host, scenario_name))
recorded = max(x['stub'].get('recorded') for x in stubs)
for stub in stubs:
stub = Stub(stub['stub'], s['name'])
space_used += stub.space_used()
host_scenarios[host][scenario_name] = (sessions, stub_counts,
recorded, human_size(space_used))
else:
host_scenarios[host][scenario_name] = (sessions, 0, '-', 0)
return host_scenarios
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:37,代码来源:api.py
示例14: __init__
def __init__(self):
dbname = testdb_name()
client = getattr(mg.conn, dbname)
Scenario.__init__(self, db=client)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:4,代码来源:testing.py
注:本文中的stubo.model.db.Scenario类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论