本文整理汇总了Python中stubo.cache.Cache类的典型用法代码示例。如果您正苦于以下问题:Python Cache类的具体用法?Python Cache怎么用?Python Cache使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Cache类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: delete
def delete(self, scenario_name):
"""
Deletes scenario object from database. Fails if there are existing stubs
in the database
:param scenario_name: <string> scenario name
"""
scenario_name = _get_scenario_full_name(self, scenario_name)
query = {'name': scenario_name}
# query MongoDB
document = yield self.db.scenario.find_one(query)
if document is not None:
# delete document
yield self.db.scenario.remove(query)
try:
# scenario name contains host, splitting it and using this value to clear cache
host, name = scenario_name.split(":")
cache = Cache(host)
# change cache
cache.delete_caches(name)
except Exception as ex:
log.warn("Failed to delete caches for scenario: %s. Got error: %s" % (scenario_name, ex))
self.set_status(200)
else:
self.send_error(412, reason="Precondition failed - scenario (%s) does not exist." % scenario_name)
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:29,代码来源:handlers.py
示例2: export_stubs
def export_stubs(handler, scenario_name):
from stubo.model.exporter import YAML_FORMAT_SUBDIR
# export stubs in the old format
command_links = export_stubs_to_commands_format(handler, scenario_name)
# continue stub export in the new format
cache = Cache(get_hostname(handler.request))
scenario_name_key = cache.scenario_key_name(scenario_name)
exporter = Exporter(static_dir = handler.settings['static_path'])
runnable = asbool(handler.get_argument('runnable', False))
playback_session = handler.get_argument('playback_session', None)
export_dir_path, files, runnable_info = exporter.export(scenario_name_key,
runnable=runnable,
playback_session=playback_session,
session_id=handler.get_argument('session_id', None),
export_dir=handler.get_argument('export_dir', None))
# getting export links
yaml_links = get_export_links(handler, scenario_name_key+"/"+YAML_FORMAT_SUBDIR, files)
payload = dict(scenario=scenario_name, export_dir_path=export_dir_path,
command_links=command_links, yaml_links=yaml_links)
if runnable_info:
payload['runnable'] = runnable_info
return dict(version=version, data=payload)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:25,代码来源:api.py
示例3: put_bookmark
def put_bookmark(handler, session_name, name):
cache = Cache(get_hostname(handler.request))
response = dict(version=version, data = {})
if not session_name:
raise exception_response(400, title="No session provided")
scenario_key = cache.find_scenario_key(session_name)
scenario_name = scenario_key.partition(':')[-1]
# retrieve the request index state for selected session
index_state = {}
request_index_data = cache.get_request_index_data(scenario_name)
if request_index_data:
for k, v in request_index_data.iteritems():
indexed_session_name, _, stub_key = k.partition(':')
if indexed_session_name == session_name:
index_state[stub_key] = v
if not index_state:
raise exception_response(400,
title="No indexes found for session '{0}'. Is the session in "
'playback mode and have state?'.format(session_name))
log.debug("save request index state '{0}' = {1}".format(name, index_state))
cache.set_saved_request_index_data(scenario_name, name, index_state)
response['data'][name] = index_state
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:26,代码来源:api.py
示例4: _import_module
def _import_module(self):
cache = Cache('localhost')
_ = cache.set_stubo_setting("tracking_level", "full", True)
self.http_client.fetch(self.get_url('/stubo/api/exec/cmds?cmdfile=/static/cmds/formatted_date/response.commands'),
self.stop)
response = self.wait()
self.assertEqual(response.code, 200)
开发者ID:SpectoLabs,项目名称:mirage,代码行数:7,代码来源:test_multidaterolling_format.py
示例5: get_delay_policy
def get_delay_policy(handler, name, cache_loc):
cache = Cache(get_hostname(handler.request))
response = {
'version' : version
}
delay = cache.get_delay_policy(name, cache_loc)
response['data'] = delay or {}
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py
示例6: list_stubs
def list_stubs(handler, scenario_name, host=None):
cache = Cache(host or get_hostname(handler.request))
scenario = Scenario()
stubs = scenario.get_stubs(cache.scenario_key_name(scenario_name))
result = dict(version=version, data=dict(scenario=scenario_name))
if stubs:
result['data']['stubs'] = [x['stub'] for x in stubs]
return result
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py
示例7: get_session_status
def get_session_status(handler, all_hosts=True):
scenario = Scenario()
host_scenarios = defaultdict()
# getting a dictionary with sizes for all scenarios
scenario_sizes = scenario.size()
scenarios_recorded = scenario.recorded()
for s in scenario.get_all():
host, scenario_name = s['name'].split(':')
if not all_hosts and get_hostname(handler.request) != host:
continue
if host not in host_scenarios:
host_scenarios[host] = {}
# getting session data
sessions = []
cache = Cache(host)
for session_name, session in cache.get_sessions(scenario_name):
# try and get the last_used from the last tracker get/response
# else when the begin/session playback was called
last_used = session_last_used(s['name'], session_name, 'playback')
if last_used:
last_used = last_used['start_time'].strftime('%Y-%m-%d %H:%M:%S')
else:
# session has never been used for playback
last_used = session.get('last_used', '-')
session['last_used'] = last_used
# removing stub information since we aren't using it anyway and it can consume a lot of memory
session.pop('stubs', None)
# creating sessions list
sessions.append(session)
# getting stub count
stub_counts = stub_count(host, scenario_name)['data']['count']
recorded = '-'
# adding session information
if sessions:
if stub_counts:
# getting scenario size and recorded values
scenario_size = 0
try:
scenario_size = scenario_sizes[s['name']]
recorded = scenarios_recorded[s['name']]
except KeyError:
log.debug("Could not get scenario size for: %s" % s['name'])
except Exception as ex:
log.warn("Failed to get scenario size for: %s, got error: %s" % (s['name'], ex))
# creating a dict with scenario information
host_scenarios[host][scenario_name] = (sessions, stub_counts, recorded, round(scenario_size, 0))
else:
host_scenarios[host][scenario_name] = (sessions, 0, '-', 0)
return host_scenarios
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:56,代码来源:api.py
示例8: delete_delay_policy
def delete_delay_policy(handler, names):
cache = Cache(get_hostname(handler.request))
response = {
'version' : version
}
num_deleted = cache.delete_delay_policy(names)
response['data'] = {
'message' : 'Deleted {0} delay policies from {1}'.format(num_deleted,
names)
}
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:11,代码来源:api.py
示例9: end_sessions
def end_sessions(handler, scenario_name):
response = {
'version' : version,
'data' : {}
}
cache = Cache(get_hostname(handler.request))
sessions = list(cache.get_sessions_status(scenario_name,
status=('record', 'playback')))
for session_name, session in sessions:
session_response = end_session(handler, session_name)
response['data'][session_name] = session_response.get('data')
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:12,代码来源:api.py
示例10: get_setting
def get_setting(handler, host, setting=None):
all_hosts = True if host == 'all' else False
if all_hosts:
host = get_hostname(handler.request)
cache = Cache(host)
result = cache.get_stubo_setting(setting, all_hosts)
response = dict(version=version, data=dict(host=host, all=all_hosts))
if setting:
response['data'][setting] = result
else:
response['data']['settings'] = result
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:12,代码来源:api.py
示例11: get
def get(self, scenario_name):
"""
Returns scenario name, current sessions and their states,
stub count, total, document size. Also, provides direct URL link to stub list.
:param scenario_name: <string> scenario name
Response JSON:
{
"stubs": 32,
"space_used_kb": 840,
"recorded": "2015-07-15",
"name": "localhost:scenario_16",
"scenarioRef": "/stubo/api/v2/scenarios/objects/localhost:scenario_16"
}
"""
# check if hostname is supplied - if not, override scenario name with new value
scenario_name = _get_scenario_full_name(self, scenario_name)
# query MongoDB
document = yield self.db.scenario.find_one({'name': scenario_name})
# form a result dictionary
if document is not None:
# get stub count
stub_count = yield self.db.scenario_stub.find({'scenario': scenario_name}).count()
# get size
scenario_cl = Scenario()
size = scenario_cl.size(scenario_name)
# check if size is None
if size is None:
size = 0
recorded = scenario_cl.recorded(scenario_name)
if recorded is None:
recorded = '-'
host, scenario = scenario_name.split(':')
# getting session data
sessions = []
cache = Cache(host)
for session_info in cache.get_scenario_sessions_information(scenario):
sessions.append(session_info)
result_dict = {'name': scenario_name,
'stub_count': stub_count,
'recorded': recorded,
'space_used_kb': int(size),
'scenarioRef': '/stubo/api/v2/scenarios/objects/{0}'.format(scenario_name),
'sessions': sessions}
self.set_status(200)
self.write(result_dict)
else:
self.send_error(404)
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:53,代码来源:handlers.py
示例12: StubCache
class StubCache(StubData):
def __init__(self, payload, scenario, session_name):
StubData.__init__(self, payload, scenario)
self.session_name = session_name
from stubo.cache import Cache
self.cache = Cache(self.hostname)
def id(self):
return "{0} => {1}".format(self.scenario_key(), self.session_name)
def request_index_id(self):
matchers = u"".join([u"".join(x.split()).strip() for x in self.contains_matchers() or []])
return compute_hash(
u"".join(
[
self.scenario_name,
matchers,
self.request_path() or "",
self.request_method(),
self.request_query_args() or "",
]
)
)
def load_from_cache(self, response_ids, delay_policy_name, recorded, system_date, module_info, request_index_key):
self.payload = dict(response=dict(ids=response_ids))
response = self.get_response_from_cache(request_index_key)
self.payload["response"] = response
self.set_recorded(recorded)
if module_info:
self.set_module(module_info)
if delay_policy_name:
self.load_delay_from_cache(delay_policy_name)
def get_response_from_cache(self, request_index_key):
# look up response
return self.cache.get_response(self.scenario_name, self.session_name, self.response_ids(), request_index_key)
def load_delay_from_cache(self, name):
self.set_delay_policy(self.cache.get_delay_policy(name))
def response_ids(self):
return self.response()["ids"]
def delay_policy_name(self):
name = ""
policy = self.delay_policy()
if policy:
name = policy.get("name")
return name
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:51,代码来源:stub.py
示例13: delete_scenario
def delete_scenario(scenario_name_key, force):
log.debug(u'delete_scenario: {0}'.format(scenario_name_key))
host, scenario_name = scenario_name_key.split(':')
cache = Cache(host)
if not force:
active_sessions = cache.get_active_sessions(scenario_name,
local=False)
if active_sessions:
raise exception_response(400,
title='Sessons in playback/record, can not delete. Found th'
'e following active sessions: {0} for scenario: {1}'.format(
active_sessions, scenario_name))
scenario_db.remove_all(scenario_name_key)
cache.delete_caches(scenario_name)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:15,代码来源:api.py
示例14: put_setting
def put_setting(handler, setting, value, host):
response = {
'version' : version
}
all_hosts = True if host == 'all' else False
if all_hosts:
host = get_hostname(handler.request)
cache = Cache(host)
new_setting = cache.set_stubo_setting(setting, value, all_hosts)
response['data'] = {
'host' : host,
'all' : all_hosts,
'new' : 'true' if new_setting else 'false', setting : value
}
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:15,代码来源:api.py
示例15: put_stub
def put_stub(handler, session_name, delay_policy, stateful, priority,
recorded=None, module_name=None, recorded_module_system_date=None):
log.debug('put_stub request: {0}'.format(handler.request))
request = handler.request
stubo_request = StuboRequest(request)
session_name = session_name.partition(',')[0]
cache = Cache(get_hostname(request))
scenario_key = cache.find_scenario_key(session_name)
trace = TrackTrace(handler.track, 'put_stub')
url_args = handler.track.request_params
err_msg = 'put/stub body format error - {0}, for session: {1}'
try:
stub = parse_stub(stubo_request.body_unicode, scenario_key, url_args)
except Exception, e:
raise exception_response(400, title=err_msg.format(e.message,
session_name))
开发者ID:rusenask,项目名称:mirage,代码行数:16,代码来源:api.py
示例16: import_bookmarks
def import_bookmarks(handler, location):
request = handler.request
cache = Cache(get_hostname(request))
response = dict(version=version, data={})
uri, bookmarks_name = UriLocation(request)(location)
log.info('uri={0}, bookmarks_name={1}'.format(uri, bookmarks_name))
payload, _, status_code = UrlFetch().get(uri)
payload = json.loads(payload)
# e.g payload
#{"converse": {"first": {"8981c0dda19403f5cc054aea758689e65db2": "2"}}}
imported = {}
for scenario, bookmarks in payload.iteritems():
for bookmark_name, bookmark in bookmarks.iteritems():
is_new = cache.set_saved_request_index_data(scenario, bookmark_name,
bookmark)
scenario_book = '{0}:{1}'.format(scenario, bookmark_name)
imported[scenario_book] = ('new' if is_new else 'updated', bookmark)
response['data']['imported'] = imported
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:19,代码来源:api.py
示例17: export_stubs
def export_stubs(handler, scenario_name):
cache = Cache(get_hostname(handler.request))
scenario_name_key = cache.scenario_key_name(scenario_name)
exporter = Exporter(static_dir = handler.settings['static_path'])
runnable = asbool(handler.get_argument('runnable', False))
playback_session = handler.get_argument('playback_session', None)
export_dir_path, files, runnable_info = exporter.export(scenario_name_key,
runnable=runnable,
playback_session=playback_session,
session_id=handler.get_argument('session_id', None),
export_dir=handler.get_argument('export_dir', None))
links = get_export_links(handler, scenario_name_key, files)
payload = dict(scenario=scenario_name, export_dir_path=export_dir_path,
links=links)
if runnable_info:
payload['runnable'] = runnable_info
return dict(version=version, data=payload)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:19,代码来源:api.py
示例18: prepare
def prepare(self):
function = self.request.path
if function.startswith("/stubo/api/"):
function = function.partition("/stubo/api/")[-1]
elif function == "/stubo/default/execCmds":
# LEGACY
function = "exec/cmds"
args = self.skip_dotted_names(self.request.arguments)
headers = self.skip_dotted_names(self.request.headers)
self.track = ObjectDict(request_params=args, request_headers=headers, request_method=self.request.method)
host_parts = self.request.host.partition(":")
host = host_parts[0].lower()
port = host_parts[-1]
cache = Cache(host)
track_setting = cache.get_stubo_setting("tracking_level")
if not track_setting:
track_setting = cache.get_stubo_setting("tracking_level", all_hosts=True)
if not track_setting:
track_setting = "normal"
self.track.tracking_level = args.get("tracking_level", track_setting)
request_size = len(self.request.body)
# always track put/stub recordings
if self.track.tracking_level == "full" or function == "put/stub":
self.track.request_text = self.request.body
if function == "get/response" and request_size <= 0:
self.track.error = "NoTextInBody"
self.track.update(
dict(
function=function,
start_time=tsecs_to_date(self.request._start_time),
host=host,
port=port,
remote_ip=self.request.remote_ip,
server=socket.getfqdn(),
request_size=request_size,
)
)
log.debug("tracking: {0}:{1}".format(self.request.host, function))
开发者ID:rusenask,项目名称:mirage,代码行数:41,代码来源:track.py
示例19: get_delay_policy
def get_delay_policy(handler, name, cache_loc):
"""
Gets specified or all delays, returns dictionary
:param handler: RequestHandler (or TrackRequest, BaseHandler, etc..)
:param name: Delay name, if None is passed - gets all delays
:param cache_loc: Cache location, usually just 'master'
:return: dictionary of dictionaries with scenario information and reference URI and status_code for response
"""
cache = Cache(get_hostname(handler.request))
response = {
'version': version
}
status_code = 200
delays = cache.get_delay_policy(name, cache_loc)
# if delay policy not found but name was specified, return error message
if delays is None and name is not None:
status_code = 404
response['error'] = "Delay policy '%s' not found" % name
return response, status_code
# list for storing delay objects since cache.get_delay_policy(name, cache_loc) returns a dict
delay_list = []
# adding references
if name is None:
if delays is not None:
# All stored delays should be returned
for k, v in delays.items():
v['delayPolicyRef'] = "/stubo/api/v2/delay-policy/objects/%s" % k
delay_list.append(v)
else:
# Returning empty dict
delays = {}
delay_list.append(delays)
else:
delays['delayPolicyRef'] = "/stubo/api/v2/delay-policy/objects/%s" % name
delay_list.append(delays)
response['data'] = delay_list
return response, status_code
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:41,代码来源:api_v2.py
示例20: jump_bookmark
def jump_bookmark(handler, name, sessions, index=None):
request = handler.request
cache = Cache(get_hostname(request))
response = dict(version=version, data = {})
scenario_key = cache.find_scenario_key(host, sessions[0])
scenario_name = scenario_key.partition(':')[-1]
if not all(cache.find_scenario_key(host, x) == scenario_key for x in sessions):
raise exception_response(400,
title="All sessions must belong to the same scenario")
index_state = cache.get_saved_request_index_data(scenario_name, name)
check_bookmark(host, name, index_state)
results = []
for session in sessions:
for k, v in index_state.iteritems():
v = v if index is None else index
session_key = '{0}:{1}'.format(session, k)
result = set_request_index_item(scenario_name, session_key, v)
results.append((k, v, result))
response['data']['results'] = results
return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:21,代码来源:api.py
注:本文中的stubo.cache.Cache类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论