本文整理汇总了Python中stubo.model.stub.Stub类的典型用法代码示例。如果您正苦于以下问题:Python Stub类的具体用法?Python Stub怎么用?Python Stub使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Stub类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_put_stub
def test_put_stub(self):
self.http_client.fetch(self.get_url('/stubo/api/put/module?name=/static/cmds/tests/ext/split/splitter.py'),
self.stop)
response = self.wait()
self.assertEqual(response.code, 200)
self.http_client.fetch(self.get_url('/stubo/api/begin/session?scenario=split&session=split_1&mode=record'),
self.stop)
response = self.wait()
self.assertEqual(response.code, 200)
self.http_client.fetch(self.get_url('/stubo/api/put/stub?session=split_1&ext_module=splitter'),
callback=self.stop,
method="POST",
body="""||textMatcher||<a>
<pre>hello</pre>
<id>xxx</id>
<post>goodbye</post>
</a>||response||Hello {{1+1}} World""")
response = self.wait()
self.assertEqual(response.code, 200)
from stubo.model.db import Scenario
scenario_db = Scenario(db=self.db)
stubs = list(scenario_db.get_stubs('localhost:split'))
self.assertEqual(len(stubs), 1)
from stubo.model.stub import Stub
stub = Stub(stubs[0]['stub'], 'localhost:split')
self.assertEqual(stub.contains_matchers(),
[u'<a>\n<pre>hello</pre>\n', '\n<post>goodbye</post>\n</a>'])
self.assertEqual(stub.response_body()[0],
u'Hello {{1+1}} World')
from datetime import date
self.assertEqual(stub.module(), {
u'system_date': str(date.today()),
u'recorded_system_date': str(date.today()),
u'name': u'splitter'})
开发者ID:ocoperations,项目名称:stubo-app,代码行数:34,代码来源:test_ext.py
示例2: test_update_dormat_session_with_stubs_and_delay
def test_update_dormat_session_with_stubs_and_delay(self):
self.hash.set('localhost:foo', 'bar', {'status': 'dormant',
'session': 'bar',
'scenario': 'localhost:foo'})
delay_policy = {"delay_type": "fixed", "name": "slow",
"milliseconds": "500"}
self.hash.set('localhost:delay_policy', 'slow', delay_policy)
self._make_scenario('localhost:foo')
from stubo.model.stub import create, Stub
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
stub.set_delay_policy('slow')
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
self._get_cache().create_session_cache('foo', 'bar')
session = self.hash.get('localhost:foo', 'bar')
self.assertTrue('stubs' in session)
stubs = session['stubs']
self.assertEqual(len(stubs), 1)
from stubo.model.stub import StubCache
stub = StubCache(stubs[0], session["scenario"], session['session'])
self.assertEqual(stub.delay_policy(), delay_policy)
开发者ID:rusenask,项目名称:mirage,代码行数:26,代码来源:test_cache.py
示例3: test_update_dormant_session_with_stubs
def test_update_dormant_session_with_stubs(self):
self.hash.set('somehost:foo', 'bar', {'status' : 'dormant',
'session' : 'bar',
'scenario' : 'localhost:foo'})
scenario_name = 'foo'
self._make_scenario('localhost:foo')
from stubo.model.stub import create, Stub, response_hash
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
cache = self._get_cache()
cache.create_session_cache('foo', 'bar')
session = self.hash.get('localhost:foo', 'bar')
self.assertEqual(session["status"], "playback")
self.assertEqual(session['session'], 'bar')
self.assertEqual(session["scenario"], "localhost:foo")
self.assertTrue('stubs' in session)
stubs = session['stubs']
self.assertEqual(len(stubs), 1)
from stubo.model.stub import StubCache
stub = StubCache(stubs[0], session["scenario"], session['session'])
self.assertEqual(stub.contains_matchers(), ["<test>match this</test>"])
self.assertEqual(stub.response_ids(), [response_hash('<test>OK</test>',
stub)])
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:27,代码来源:test_cache.py
示例4: test_new_session_with_state
def test_new_session_with_state(self):
scenario_name = 'foo'
self._make_scenario('localhost:foo')
from stubo.model.stub import create, Stub, response_hash
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
stub2 = Stub(create('<test>match this</test>', '<test>BAD</test>'),
'localhost:foo')
doc2 = dict(scenario='localhost:foo', stub=stub2)
self.scenario.insert_stub(doc2, stateful=True)
cache = self._get_cache()
cache.create_session_cache('foo', 'bar')
session = self.hash.get('localhost:foo', 'bar')
self.assertEqual(session["status"], "playback")
self.assertEqual(session['session'], 'bar')
self.assertEqual(session["scenario"], "localhost:foo")
self.assertTrue('stubs' in session)
stubs = session['stubs']
self.assertEqual(len(stubs), 1)
from stubo.model.stub import StubCache
stub = StubCache(stubs[0], session["scenario"], session['session'])
responses = stub.response_ids()
self.assertEqual(len(responses), 2)
self.assertEqual(stub.contains_matchers(), ["<test>match this</test>"])
self.assertEqual(responses, [response_hash('<test>OK</test>', stub),
response_hash('<test>BAD</test>', stub2)])
self.assertEqual(self.hash.get_raw('localhost:sessions', 'bar'), 'foo')
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:32,代码来源:test_cache.py
示例5: insert_stub
def insert_stub(self, doc, stateful):
"""
Insert stub into DB. Performs a check whether this stub already exists in database or not. If it exists
and stateful is True - new response is appended to the response list, else - reports that duplicate stub
found and it will not be inserted.
:param doc: Stub class with Stub that will be inserted
:param stateful: <boolean> specify whether stub insertion should be stateful or not
:return: <string> message with insertion status:
ignored - if not stateful and stub was already present
updated - if stateful and stub was already present
created - if stub was not present in database
"""
# getting initial values - stub matchers, scenario name
matchers = doc["stub"].contains_matchers()
scenario = doc["scenario"]
matchers_hash = self._create_hash(matchers)
# check if we have matchers - should be None for REST calls
if matchers is not None:
# additional helper value for indexing
doc["matchers_hash"] = matchers_hash
matched_stub = self.get_matched_stub(name=scenario, matchers_hash=matchers_hash)
# checking if stub already exists
if matched_stub:
# creating stub object from found document
the_stub = Stub(matched_stub["stub"], scenario)
if not stateful and doc["stub"].response_body() == the_stub.response_body():
msg = "duplicate stub found, not inserting."
log.warn(msg)
result = {"status": "ignored", "msg": msg, "key": str(matched_stub["_id"])}
return result
# since stateful is true - updating stub body by extending the list
log.debug(
"In scenario: {0} found exact match for matchers:"
" {1}. Perform stateful update of stub.".format(scenario, matchers)
)
response = the_stub.response_body()
response.extend(doc["stub"].response_body())
the_stub.set_response_body(response)
# updating Stub body and size, writing to database
self.db.scenario_stub.update(
{"_id": matched_stub["_id"]},
{"$set": {"stub": the_stub.payload, "space_used": len(unicode(the_stub.payload))}},
)
result = {"status": "updated", "msg": "Updated with stateful response", "key": str(matched_stub["_id"])}
return result
# inserting stub into DB
status = self.db.scenario_stub.insert(self.get_stub_document(doc))
result = {"status": "created", "msg": "Inserted scenario_stub", "key": str(status)}
return result
开发者ID:rusenask,项目名称:mirage,代码行数:53,代码来源:db.py
示例6: test_with_delay
def test_with_delay(self):
scenario_name = 'foo'
self._make_scenario('localhost:foo')
from stubo.model.stub import Stub, create
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
stub.set_delay_policy(dict(name='slow'))
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
cache = self._get_cache()
session = cache.create_session_cache('foo', 'bar')
self._func(session, stub)
self.assertEqual(self.hash.get('localhost:foo:request',
'bar:1'),
[[u'1'], u'slow', None, u'2013-09-05', {}, u'12b0a0eced1ec13b53d186be7bd4909fa94d1916cca4daa25bd24d48'])
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:15,代码来源:test_cache.py
示例7: test_it_with_module
def test_it_with_module(self):
module = {"system_date": "2013-08-07", "version": 1, "name": "mymodule"}
scenario_name = 'foo'
self._make_scenario('localhost:foo')
from stubo.model.stub import create, Stub, response_hash
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
stub.set_module(module)
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
cache = self._get_cache()
session = cache.create_session_cache('foo', 'bar')
self._func(session, stub)
self.assertEqual(self.hash.get('localhost:foo:request',
'bar:1'),
[[u'1'], u'', None, u'2013-09-05', module, u'12b0a0eced1ec13b53d186be7bd4909fa94d1916cca4daa25bd24d48'])
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:16,代码来源:test_cache.py
示例8: test_update_dormat_session_with_stubs_and_module
def test_update_dormat_session_with_stubs_and_module(self):
self.hash.set('localhost:foo', 'bar', {'status' : 'dormant',
'session' : 'bar',
'scenario' : 'localhost:foo'})
module = {"system_date": "2013-09-24", "version": 1, "name": "funcky"}
self._make_scenario('localhost:foo')
from stubo.model.stub import create, Stub
stub = Stub(create('<test>match this</test>', '<test>OK</test>'),
'localhost:foo')
stub.set_module(module)
doc = dict(scenario='localhost:foo', stub=stub)
self.scenario.insert_stub(doc, stateful=True)
self._get_cache().create_session_cache('foo', 'bar')
session = self.hash.get('localhost:foo', 'bar')
self.assertTrue('stubs' in session)
stubs = session['stubs']
self.assertEqual(len(stubs), 1)
from stubo.model.stub import StubCache
stub = StubCache(stubs[0], session["scenario"], session['session'])
self.assertEqual(stub.module(), module)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:21,代码来源:test_cache.py
示例9: insert_stub
def insert_stub(self, doc, stateful):
from stubo.model.stub import Stub
matchers = doc['stub'].contains_matchers()
scenario = doc['scenario']
stubs_cursor = self.get_stubs(scenario)
if stubs_cursor.count():
for stub in stubs_cursor:
the_stub = Stub(stub['stub'], scenario)
if matchers and matchers == the_stub.contains_matchers():
if not stateful and \
doc['stub'].response_body() == the_stub.response_body():
msg = 'duplicate stub found, not inserting.'
log.warn(msg)
return msg
log.debug('In scenario: {0} found exact match for matchers:'
' {1}. Perform stateful update of stub.'.format(scenario,
matchers))
response = the_stub.response_body()
response.extend(doc['stub'].response_body())
the_stub.set_response_body(response)
self.db.scenario_stub.update(
{'_id': ObjectId(stub['_id'])},
{'$set' : {'stub' : the_stub.payload}})
return 'updated with stateful response'
doc['stub'] = doc['stub'].payload
status = self.db.scenario_stub.insert(doc)
return 'inserted scenario_stub: {0}'.format(status)
开发者ID:erowan,项目名称:stubo-app,代码行数:27,代码来源:db.py
示例10: get_session_status
def get_session_status(handler, all_hosts=True):
scenario = Scenario()
host_scenarios = {}
for s in scenario.get_all():
host, scenario_name = s['name'].split(':')
if not all_hosts and get_hostname(handler.request) != host:
continue
if host not in host_scenarios:
host_scenarios[host] = {}
sessions = []
cache = Cache(host)
for session_name, session in cache.get_sessions(scenario_name):
# try and get the last_used from the last tracker get/response
# else when the begin/session playback was called
last_used = session_last_used(s['name'], session_name, 'playback')
if last_used:
last_used = last_used['start_time'].strftime('%Y-%m-%d %H:%M:%S')
else:
# session has never been used for playback
last_used = session.get('last_used', '-')
session['last_used'] = last_used
sessions.append(session)
stub_counts = stub_count(host, scenario_name)['data']['count']
recorded = '-'
space_used = 0
if sessions:
if stub_counts:
stubs = list(get_stubs(host, scenario_name))
recorded = max(x['stub'].get('recorded') for x in stubs)
for stub in stubs:
stub = Stub(stub['stub'], s['name'])
space_used += stub.space_used()
host_scenarios[host][scenario_name] = (sessions, stub_counts,
recorded, human_size(space_used))
else:
host_scenarios[host][scenario_name] = (sessions, 0, '-', 0)
return host_scenarios
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:37,代码来源:api.py
示例11: insert_stub
def insert_stub(self, doc, stateful):
"""
Insert stub into DB. Performs a check whether this stub already exists in database or not. If it exists
and stateful is True - new response is appended to the response list, else - reports that duplicate stub
found and it will not be inserted.
:param doc: Stub class with Stub that will be inserted
:param stateful: <boolean> specify whether stub insertion should be stateful or not
:return: <string> message with insertion status
"""
matchers = doc['stub'].contains_matchers()
scenario = doc['scenario']
stubs_cursor = self.get_stubs(scenario)
if stubs_cursor.count():
for stub in stubs_cursor:
the_stub = Stub(stub['stub'], scenario)
if matchers and matchers == the_stub.contains_matchers():
if not stateful and \
doc['stub'].response_body() == the_stub.response_body():
msg = 'duplicate stub found, not inserting.'
log.warn(msg)
return msg
log.debug('In scenario: {0} found exact match for matchers:'
' {1}. Perform stateful update of stub.'.format(scenario,
matchers))
response = the_stub.response_body()
response.extend(doc['stub'].response_body())
the_stub.set_response_body(response)
self.db.scenario_stub.update(
{'_id': ObjectId(stub['_id'])},
{'$set' : {'stub' : the_stub.payload}})
return 'updated with stateful response'
doc['stub'] = doc['stub'].payload
status = self.db.scenario_stub.insert(doc)
self.db.scenario_stub.create_index([("stub.priority", ASCENDING), ("scenario", ASCENDING)])
return 'inserted scenario_stub: {0}'.format(status)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:36,代码来源:db.py
示例12: export_stubs_to_commands_format
def export_stubs_to_commands_format(handler, scenario_name):
"""
Exports scenario to .commands file format.
:param handler:
:param scenario_name: <string> Scenario name
:return: :raise exception_response:
"""
cache = Cache(get_hostname(handler.request))
scenario_name_key = cache.scenario_key_name(scenario_name)
# use user arg or epoch time
session_id = handler.get_argument('session_id', int(time.time()))
session = u'{0}_{1}'.format(scenario_name, session_id)
cmds = [
'delete/stubs?scenario={0}'.format(scenario_name),
'begin/session?scenario={0}&session={1}&mode=record'.format(
scenario_name, session)
]
files = []
scenario = Scenario()
# get scenario pre stubs for specified scenario
stubs = list(scenario.get_pre_stubs(scenario_name_key))
if stubs:
for i in range(len(stubs)):
entry = stubs[i]
stub = Stub(entry['stub'], scenario_name_key)
# if stub is rest - matcher may be None, checking that
if stub.contains_matchers() is None:
cmds.append('# Stub skipped since no matchers were found. Consider using .yaml format for additional '
'capabilities')
# skipping to next stub, this stub is not compatible with .commands format
continue
matchers = [('{0}_{1}_{2}.textMatcher'.format(session, i, x), stub.contains_matchers()[x])
for x in range(len(stub.contains_matchers()))]
matchers_str = ",".join(x[0] for x in matchers)
url_args = stub.args()
url_args['session'] = session
module_info = stub.module()
if module_info:
# Note: not including put/module in the export, modules are shared
# by multiple scenarios.
url_args['ext_module'] = module_info['name']
url_args['stub_created_date'] = stub.recorded()
url_args['stubbedSystemDate'] = module_info.get('recorded_system_date')
url_args['system_date'] = module_info.get('system_date')
url_args = urlencode(url_args)
responses = stub.response_body()
assert(len(responses) == 1)
response = responses[0]
response = ('{0}_{1}.response'.format(session, i), response)
cmds.append('put/stub?{0},{1},{2}'.format(url_args, matchers_str,
response[0]))
files.append(response)
files.extend(matchers)
else:
cmds.append('put/stub?session={0},text=a_dummy_matcher,text=a_dummy_response'.format(session))
cmds.append('end/session?session={0}'.format(session))
runnable = asbool(handler.get_argument('runnable', False))
runnable_info = dict()
if runnable:
playback_session = handler.get_argument('playback_session', None)
if not playback_session:
raise exception_response(400,
title="'playback_session' argument required with 'runnable")
runnable_info['playback_session'] = playback_session
tracker = Tracker()
last_used = tracker.session_last_used(scenario_name_key,
playback_session, 'playback')
if not last_used:
raise exception_response(400,
title="Unable to find playback session")
runnable_info['last_used'] = dict(remote_ip=last_used['remote_ip'],
start_time=str(last_used['start_time']))
playback = tracker.get_last_playback(scenario_name, playback_session,
last_used['start_time'])
playback = list(playback)
if not playback:
raise exception_response(400,
title="Unable to find a playback for scenario='{0}', playback_session='{1}'".format(scenario_name, playback_session))
cmds.append('begin/session?scenario={0}&session={1}&mode=playback'.format(
scenario_name, session))
number_of_requests = len(playback)
runnable_info['number_of_playback_requests'] = number_of_requests
for nrequest in range(number_of_requests):
track = playback[nrequest]
request_text = track.get('request_text')
if not request_text:
raise exception_response(400, title='Unable to obtain playback details, was full tracking enabled?')
request_file_name = '{0}_{1}.request'.format(session, nrequest)
files.append((request_file_name, request_text))
stubo_response_text = track['stubo_response']
if not isinstance(stubo_response_text, basestring):
stubo_response_text = unicode(stubo_response_text)
stubo_response_file_name = '{0}_{1}.stubo_response'.format(session, nrequest)
files.append((stubo_response_file_name, stubo_response_text))
#.........这里部分代码省略.........
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:101,代码来源:export_commands.py
示例13: begin_session
def begin_session(handler, scenario_name, session_name, mode, system_date=None,
warm_cache=False):
"""
Begins session for given scenario
:param handler: request handler class
:param scenario_name: scenario name
:param session_name: session name
:param mode: mode - record, playback
:param system_date:
:param warm_cache:
:return: :raise exception_response:
"""
log.debug('begin_session')
response = {
'version': version
}
scenario_manager = Scenario()
cache = Cache(get_hostname(handler.request))
if cache.blacklisted():
raise exception_response(400, title="Sorry the host URL '{0}' has been "
"blacklisted. Please contact Stub-O-Matic support.".format(cache.host))
# checking whether full name (with hostname) was passed, if not - getting full name
# scenario_name_key = "localhost:scenario_1"
if ":" not in scenario_name:
scenario_name_key = cache.scenario_key_name(scenario_name)
else:
# setting scenario full name
scenario_name_key = scenario_name
# removing hostname from scenario name
scenario_name = scenario_name.split(":")[1]
# get scenario document
scenario_doc = scenario_manager.get(scenario_name_key)
if not scenario_doc:
raise exception_response(404,
title='Scenario not found - {0}. To begin a'
' session - create a scenario.'.format(scenario_name_key))
cache.assert_valid_session(scenario_name, session_name)
if mode == 'record':
log.debug('begin_session, mode=record')
# check if there are any existing stubs in this scenario
if scenario_manager.stub_count(scenario_name_key) > 0:
err = exception_response(400,
title='Scenario ({0}) has existing stubs, delete them before '
'recording or create another scenario!'.format(scenario_name_key))
raise err
scenario_id = scenario_doc['_id']
log.debug('new scenario: {0}'.format(scenario_id))
session_payload = {
'status': 'record',
'scenario': scenario_name_key,
'scenario_id': str(scenario_id),
'session': str(session_name)
}
cache.set_session(scenario_name, session_name, session_payload)
log.debug('new redis session: {0}:{1}'.format(scenario_name_key,
session_name))
response["data"] = {
'message': 'Record mode initiated....',
}
response["data"].update(session_payload)
cache.set_session_map(scenario_name, session_name)
log.debug('finish record')
elif mode == 'playback':
recordings = cache.get_sessions_status(scenario_name,
status='record',
local=False)
if recordings:
raise exception_response(400, title='Scenario recordings taking '
'place - {0}. Found the '
'following record sessions: {1}'.format(scenario_name_key, recordings))
cache.create_session_cache(scenario_name, session_name, system_date)
if warm_cache:
# iterate over stubs and call get/response for each stub matchers
# to build the request & request_index cache
# reset request_index to 0
log.debug("warm cache for session '{0}'".format(session_name))
scenario_manager = Scenario()
for payload in scenario_manager.get_stubs(scenario_name_key):
stub = Stub(payload['stub'], scenario_name_key)
mock_request = " ".join(stub.contains_matchers())
handler.request.body = mock_request
get_response(handler, session_name)
cache.reset_request_index(scenario_name)
response["data"] = {
"message": "Playback mode initiated...."
}
response["data"].update({
"status": "playback",
"scenario": scenario_name_key,
"session": str(session_name)
})
else:
raise exception_response(400,
#.........这里部分代码省略.........
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:101,代码来源:api_v2.py
示例14: insert_stub
def insert_stub(self, doc, stateful):
"""
Insert stub into DB. Performs a check whether this stub already exists in database or not. If it exists
and stateful is True - new response is appended to the response list, else - reports that duplicate stub
found and it will not be inserted.
:param doc: Stub class with Stub that will be inserted
:param stateful: <boolean> specify whether stub insertion should be stateful or not
:return: <string> message with insertion status
"""
# getting initial values - stub matchers, scenario name
matchers = doc['stub'].contains_matchers()
scenario = doc['scenario']
matchers_hash = self._create_hash(matchers)
# check if we have matchers - should be None for REST calls
if matchers is not None:
# additional helper value for indexing
doc['matchers_hash'] = matchers_hash
matched_stub = self.get_matched_stub(name=scenario, matchers_hash=matchers_hash)
# checking if stub already exists
if matched_stub:
# creating stub object from found document
the_stub = Stub(matched_stub['stub'], scenario)
if not stateful and doc['stub'].response_body() == the_stub.response_body():
msg = 'duplicate stub found, not inserting.'
log.warn(msg)
return msg
# since stateful is true - updating stub body by extending the list
log.debug('In scenario: {0} found exact match for matchers:'
' {1}. Perform stateful update of stub.'.format(scenario, matchers))
response = the_stub.response_body()
response.extend(doc['stub'].response_body())
the_stub.set_response_body(response)
# updating Stub body and size, writing to database
self.db.scenario_stub.update(
{'_id': matched_stub['_id']},
{'$set': {'stub': the_stub.payload,
'space_used': len(unicode(the_stub.payload))}})
return 'updated with stateful response'
# Stub doesn't exist in DB - preparing new object
doc['stub'] = doc['stub'].payload
# additional helper for aggregation framework
try:
doc['recorded'] = doc['stub']['recorded']
except KeyError:
# during tests "recorded" value is not supplied
pass
# calculating stub size
doc['space_used'] = len(unicode(doc['stub']))
# inserting stub into DB
status = self.db.scenario_stub.insert(doc)
# create indexes
if matchers_hash:
self._create_index(key="matchers_hash")
# creating index for scenario and priority
self._create_index(key="scenario")
if 'priority' in doc['stub']:
# creating index for priority
self._create_index("stub.priority")
return 'inserted scenario_stub: {0}'.format(status)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:67,代码来源:db.py
示例15: begin_session
def begin_session(handler, scenario_name, session_name, mode, system_date=None,
warm_cache=False):
log.debug('begin_session')
response = {
'version' : version
}
scenario_col = Scenario()
cache = Cache(get_hostname(handler.request))
if cache.blacklisted():
raise exception_response(400, title="Sorry the host URL '{0}' has been "
"blacklisted. Please contact Stub-O-Matic support.".format(cache.host))
scenario_name_key = cache.scenario_key_name(scenario_name)
scenario = scenario_col.get(scenario_name_key)
cache.assert_valid_session(scenario_name, session_name)
if mode == 'record':
log.debug('begin_session, mode=record')
# precond: delete/stubs?scenario={scenario_name}
if scenario:
err = exception_response(400,
title='Duplicate scenario found - {0}'.format(scenario_name_key))
raise err
if scenario_col.stub_count(scenario_name_key) != 0:
raise exception_response(500,
title='stub_count !=0 for scenario: {0}'.format(
scenario_name_key))
scenario_id = scenario_col.insert(name=scenario_name_key)
log.debug('new scenario: {0}'.format(scenario_id))
session_payload = {
'status' : 'record',
'scenario' : scenario_name_key,
'scenario_id' : str(scenario_id),
'session' : str(session_name)
}
cache.set_session(scenario_name, session_name, session_payload)
log.debug('new redis session: {0}:{1}'.format(scenario_name_key,
session_name))
response["data"] = {
'message' : 'Record mode initiated....',
}
response["data"].update(session_payload)
cache.set_session_map(scenario_name, session_name)
log.debug('finish record')
elif mode == 'playback':
if not scenario:
raise exception_response(400,
title='Scenario not found - {0}'.format(scenario_name_key))
recordings = cache.get_sessions_status(scenario_name,
status=('record'),
local=False)
if recordings:
raise exception_response(400, title='Scenario recordings taking ' \
'place - {0}. Found the following record sessions: {1}'.format(
scenario_name_key, recordings))
cache.create_session_cache(scenario_name, session_name, system_date)
if warm_cache:
# iterate over stubs and call get/response for each stub matchers
# to build the request & request_index cache
# reset request_index to 0
log.debug("warm cache for session '{0}'".format(session_name))
scenario_col = Scenario()
for payload in scenario_col.get_stubs(scenario_name_key):
stub = Stub(payload['stub'], scenario_name_key)
mock_request = " ".join(stub.contains_matchers())
handler.request.body = mock_request
get_response(handler, session_name)
cache.reset_request_index(scenario_name)
response["data"] = {
"message" : "Playback mode initiated...."
}
response["data"].update({
"status" : "playback",
"scenario" : scenario_name_key,
"session" : str(session_name)
})
else:
raise exception_response(400,
title='Mode of playback or record required')
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:80,代码来源:api.py
示例16: create_session_cache
def create_session_cache(self, scenario_name, session_name,
system_date=None):
scenario_key = self.scenario_key_name(scenario_name)
log.debug("create_session_cache: scenario_key={0}, session_name={1}".format(
scenario_key, session_name))
session = self.get(scenario_key, session_name)
if not session:
# must be using a different session name for playback than record
session = {
'session' : session_name,
'scenario' : scenario_key
}
# add to sessions map
self.set_raw('{0}:sessions'.format(self.host), session_name, scenario_name)
session['status'] = 'playback'
session['system_date'] = system_date or datetime.date.today().strftime(
'%Y-%m-%d')
session['last_used'] = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
cache_info = []
# copy mongo scenario stubs to redis cache
scenario_col = Scenario()
stubs_cursor = scenario_col.get_stubs(scenario_key)
stubs = list(stubs_cursor)
if not stubs:
raise exception_response(500,
title="found no stubs in mongo for {0}".format(scenario_key))
from stubo.ext.module import Module
for scenario_stub in stubs:
stub = Stub(scenario_stub['stub'], scenario_stub['scenario'])
if stub.module():
module_name = stub.module()['name']
# tag this stub with the latest version of the module
module = Module(self.host)
version = module.latest_version(module_name)
if not version:
raise exception_response(500,
title="module '{0}' not found in cache".format(
module.key(module_name)))
stub.module()['version'] = version
response_ids = []
response_bodys = stub.response_body()
# cache each response id -> response (text, status) etc
for response_text in response_bodys:
stub.set_response_body(response_text)
response_id = response_hash(response_text, stub)
self.set_response(scenario_name, session_name, response_id,
stub.response())
response_ids.append(response_id)
# replace response text with response hash ids for session cache
stub.response().pop('body', None)
stub.response()['ids'] = response_ids
delay_policy_name = stub.delay_policy()
if delay_policy_name:
# Note: the delay policy is not really cached with the session.
# The get/response call will just use the name to get the latest
# delay value from the 'delay_policy' key in redis.
delay_policy_key = '{0}:delay_policy'.format(self.host)
delay_policy = self.get(delay_policy_key, delay_policy_name)
if not delay_policy:
log.warn('unable to find delay_policy: {0}'.format(
delay_policy_name))
stub.set_delay_policy(delay_policy)
#_id = ObjectId(scenario_stub['_id'])
#stub['recorded'] = str(_id.generation_time.date())
cache_info.append(stub.payload)
session['stubs'] = cache_info
#log.debug('stubs: {0}'.format(session['stubs']))
self.set(scenario_key, session_name, session)
log.debug('created session cache: {0}:{1}'.format(session['scenario'],
session['session']))
return session
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:75,代码来源:__init__.py
注:本文中的stubo.model.stub.Stub类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论