• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python cache.Cache类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中stubo.cache.Cache的典型用法代码示例。如果您正苦于以下问题:Python Cache类的具体用法?Python Cache怎么用?Python Cache使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Cache类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: delete

    def delete(self, scenario_name):
        """

        Deletes scenario object from database. Fails if there are existing stubs
        in the database
        :param scenario_name: <string> scenario name
        """
        scenario_name = _get_scenario_full_name(self, scenario_name)

        query = {'name': scenario_name}
        # query MongoDB
        document = yield self.db.scenario.find_one(query)

        if document is not None:
            # delete document
            yield self.db.scenario.remove(query)
            try:
                # scenario name contains host, splitting it and using this value to clear cache
                host, name = scenario_name.split(":")

                cache = Cache(host)
                # change cache
                cache.delete_caches(name)
            except Exception as ex:
                log.warn("Failed to delete caches for scenario: %s. Got error: %s" % (scenario_name, ex))

            self.set_status(200)
        else:
            self.send_error(412, reason="Precondition failed - scenario (%s) does not exist." % scenario_name)
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:29,代码来源:handlers.py


示例2: export_stubs

def export_stubs(handler, scenario_name):
    from stubo.model.exporter import YAML_FORMAT_SUBDIR
    # export stubs in the old format
    command_links = export_stubs_to_commands_format(handler, scenario_name)
    # continue stub export in the new format
    cache = Cache(get_hostname(handler.request))  
    scenario_name_key = cache.scenario_key_name(scenario_name)

    exporter = Exporter(static_dir = handler.settings['static_path'])
    runnable = asbool(handler.get_argument('runnable', False))
    playback_session = handler.get_argument('playback_session', None)
    export_dir_path, files, runnable_info = exporter.export(scenario_name_key, 
                                             runnable=runnable, 
                                             playback_session=playback_session, 
               session_id=handler.get_argument('session_id', None), 
               export_dir=handler.get_argument('export_dir', None))

    # getting export links
    yaml_links = get_export_links(handler, scenario_name_key+"/"+YAML_FORMAT_SUBDIR, files)

    payload = dict(scenario=scenario_name, export_dir_path=export_dir_path,
                   command_links=command_links, yaml_links=yaml_links)
    if runnable_info:
        payload['runnable'] = runnable_info
    return dict(version=version, data=payload)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:25,代码来源:api.py


示例3: put_bookmark

def put_bookmark(handler, session_name, name):
    cache = Cache(get_hostname(handler.request)) 
    response = dict(version=version, data = {}) 
    if not session_name:
        raise exception_response(400, title="No session provided")
            
    scenario_key = cache.find_scenario_key(session_name)
    scenario_name = scenario_key.partition(':')[-1]         
               
    # retrieve the request index state for selected session
    index_state = {}
    request_index_data = cache.get_request_index_data(scenario_name)
    if request_index_data:
        for k, v in  request_index_data.iteritems():
            indexed_session_name, _, stub_key = k.partition(':')
            if indexed_session_name == session_name:
                index_state[stub_key] = v          
        
    if not index_state:
        raise exception_response(400,
            title="No indexes found for session '{0}'. Is the session in "
            'playback mode and have state?'.format(session_name)) 
    log.debug("save request index state '{0}' = {1}".format(name, index_state))
    cache.set_saved_request_index_data(scenario_name, name, index_state)
    response['data'][name] = index_state
    return response   
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:26,代码来源:api.py


示例4: _import_module

 def _import_module(self):
     cache = Cache('localhost')
     _ = cache.set_stubo_setting("tracking_level", "full", True)
     self.http_client.fetch(self.get_url('/stubo/api/exec/cmds?cmdfile=/static/cmds/formatted_date/response.commands'),
                            self.stop)
     response = self.wait()
     self.assertEqual(response.code, 200)
开发者ID:SpectoLabs,项目名称:mirage,代码行数:7,代码来源:test_multidaterolling_format.py


示例5: get_delay_policy

def get_delay_policy(handler, name, cache_loc):
    cache = Cache(get_hostname(handler.request))
    response = {
        'version' : version
    } 
    delay = cache.get_delay_policy(name, cache_loc)
    response['data'] = delay or {}
    return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py


示例6: list_stubs

def list_stubs(handler, scenario_name, host=None):
    cache = Cache(host or get_hostname(handler.request))
    scenario = Scenario()
    stubs = scenario.get_stubs(cache.scenario_key_name(scenario_name))
    result = dict(version=version, data=dict(scenario=scenario_name))
    if stubs:
        result['data']['stubs'] = [x['stub'] for x in stubs]
    return result
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:8,代码来源:api.py


示例7: get_session_status

def get_session_status(handler, all_hosts=True):
    scenario = Scenario()
    host_scenarios = defaultdict()

    # getting a dictionary with sizes for all scenarios
    scenario_sizes = scenario.size()
    scenarios_recorded = scenario.recorded()

    for s in scenario.get_all():
        host, scenario_name = s['name'].split(':')
        if not all_hosts and get_hostname(handler.request)  != host:
            continue
        if host not in host_scenarios:
            host_scenarios[host] = {}
            
        # getting session data
        sessions = []
        cache = Cache(host)

        for session_name, session in cache.get_sessions(scenario_name):
            # try and get the last_used from the last tracker get/response
            # else when the begin/session playback was called
            last_used = session_last_used(s['name'], session_name, 'playback')
            if last_used:
                last_used = last_used['start_time'].strftime('%Y-%m-%d %H:%M:%S')
            else:
                # session has never been used for playback 
                last_used = session.get('last_used', '-')
            session['last_used'] = last_used
            # removing stub information since we aren't using it anyway and it can consume a lot of memory
            session.pop('stubs', None)
            # creating sessions list
            sessions.append(session)
        # getting stub count
        stub_counts = stub_count(host, scenario_name)['data']['count']
        recorded = '-'

        # adding session information
        if sessions:
            if stub_counts:
                # getting scenario size and recorded values
                scenario_size = 0
                try:
                    scenario_size = scenario_sizes[s['name']]
                    recorded = scenarios_recorded[s['name']]
                except KeyError:
                    log.debug("Could not get scenario size for: %s" % s['name'])
                except Exception as ex:
                    log.warn("Failed to get scenario size for: %s, got error: %s" % (s['name'], ex))
                # creating a dict with scenario information
                host_scenarios[host][scenario_name] = (sessions, stub_counts, recorded, round(scenario_size, 0))

            else:
                host_scenarios[host][scenario_name] = (sessions, 0, '-', 0)

    return host_scenarios
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:56,代码来源:api.py


示例8: delete_delay_policy

def delete_delay_policy(handler, names):
    cache = Cache(get_hostname(handler.request))
    response = {
        'version' : version
    } 
    num_deleted = cache.delete_delay_policy(names)
    response['data'] = {
       'message' : 'Deleted {0} delay policies from {1}'.format(num_deleted,
                                                                names)
    }
    return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:11,代码来源:api.py


示例9: end_sessions

def end_sessions(handler, scenario_name):
    response = {
        'version' : version,
        'data' : {}
    }
    cache = Cache(get_hostname(handler.request))
    sessions = list(cache.get_sessions_status(scenario_name, 
                                              status=('record', 'playback')))
    for session_name, session in sessions:
        session_response = end_session(handler, session_name)
        response['data'][session_name] = session_response.get('data')
    return response    
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:12,代码来源:api.py


示例10: get_setting

def get_setting(handler, host, setting=None):
    all_hosts = True if host == 'all' else False
    if all_hosts:
        host = get_hostname(handler.request)
    cache = Cache(host)
    result = cache.get_stubo_setting(setting, all_hosts)   
    response = dict(version=version, data=dict(host=host, all=all_hosts))
    if setting:
        response['data'][setting] = result
    else:
        response['data']['settings'] = result      
    return response                         
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:12,代码来源:api.py


示例11: get

    def get(self, scenario_name):
        """

        Returns scenario name, current sessions and their states,
        stub count, total, document size. Also, provides direct URL link to stub list.
        :param scenario_name: <string> scenario name

        Response JSON:
        {
            "stubs": 32,
             "space_used_kb": 840,
             "recorded": "2015-07-15",
             "name": "localhost:scenario_16",
             "scenarioRef": "/stubo/api/v2/scenarios/objects/localhost:scenario_16"
         }
        """

        # check if hostname is supplied - if not, override scenario name with new value
        scenario_name = _get_scenario_full_name(self, scenario_name)
        # query MongoDB
        document = yield self.db.scenario.find_one({'name': scenario_name})

        # form a result dictionary
        if document is not None:
            # get stub count
            stub_count = yield self.db.scenario_stub.find({'scenario': scenario_name}).count()
            # get size
            scenario_cl = Scenario()
            size = scenario_cl.size(scenario_name)
            # check if size is None
            if size is None:
                size = 0
            recorded = scenario_cl.recorded(scenario_name)
            if recorded is None:
                recorded = '-'

            host, scenario = scenario_name.split(':')
            # getting session data
            sessions = []
            cache = Cache(host)
            for session_info in cache.get_scenario_sessions_information(scenario):
                sessions.append(session_info)

            result_dict = {'name': scenario_name,
                           'stub_count': stub_count,
                           'recorded': recorded,
                           'space_used_kb': int(size),
                           'scenarioRef': '/stubo/api/v2/scenarios/objects/{0}'.format(scenario_name),
                           'sessions': sessions}
            self.set_status(200)
            self.write(result_dict)
        else:
            self.send_error(404)
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:53,代码来源:handlers.py


示例12: StubCache

class StubCache(StubData):
    def __init__(self, payload, scenario, session_name):
        StubData.__init__(self, payload, scenario)
        self.session_name = session_name
        from stubo.cache import Cache

        self.cache = Cache(self.hostname)

    def id(self):
        return "{0} => {1}".format(self.scenario_key(), self.session_name)

    def request_index_id(self):
        matchers = u"".join([u"".join(x.split()).strip() for x in self.contains_matchers() or []])
        return compute_hash(
            u"".join(
                [
                    self.scenario_name,
                    matchers,
                    self.request_path() or "",
                    self.request_method(),
                    self.request_query_args() or "",
                ]
            )
        )

    def load_from_cache(self, response_ids, delay_policy_name, recorded, system_date, module_info, request_index_key):
        self.payload = dict(response=dict(ids=response_ids))
        response = self.get_response_from_cache(request_index_key)
        self.payload["response"] = response
        self.set_recorded(recorded)
        if module_info:
            self.set_module(module_info)
        if delay_policy_name:
            self.load_delay_from_cache(delay_policy_name)

    def get_response_from_cache(self, request_index_key):
        # look up response
        return self.cache.get_response(self.scenario_name, self.session_name, self.response_ids(), request_index_key)

    def load_delay_from_cache(self, name):
        self.set_delay_policy(self.cache.get_delay_policy(name))

    def response_ids(self):
        return self.response()["ids"]

    def delay_policy_name(self):
        name = ""
        policy = self.delay_policy()
        if policy:
            name = policy.get("name")
        return name
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:51,代码来源:stub.py


示例13: delete_scenario

 def delete_scenario(scenario_name_key, force):
     log.debug(u'delete_scenario: {0}'.format(scenario_name_key))
     host, scenario_name = scenario_name_key.split(':')
     cache = Cache(host)
     if not force:
         active_sessions = cache.get_active_sessions(scenario_name, 
                                                     local=False)
         if active_sessions:
             raise exception_response(400, 
                 title='Sessons in playback/record, can not delete. Found th'
                 'e following active sessions: {0} for scenario: {1}'.format(
                                         active_sessions, scenario_name))     
       
     scenario_db.remove_all(scenario_name_key) 
     cache.delete_caches(scenario_name)
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:15,代码来源:api.py


示例14: put_setting

def put_setting(handler, setting, value, host):
    response = {
        'version' : version
    } 
    all_hosts = True if host == 'all' else False
    if all_hosts:
        host = get_hostname(handler.request)
    cache = Cache(host)
    new_setting = cache.set_stubo_setting(setting, value, all_hosts)
    response['data'] = {
        'host' : host, 
        'all' : all_hosts,
        'new' : 'true' if new_setting else 'false', setting : value
    }                     
    return response   
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:15,代码来源:api.py


示例15: put_stub

def put_stub(handler, session_name, delay_policy, stateful, priority,
             recorded=None, module_name=None, recorded_module_system_date=None):
    log.debug('put_stub request: {0}'.format(handler.request))
    request = handler.request
    stubo_request = StuboRequest(request)
    session_name = session_name.partition(',')[0]
    cache = Cache(get_hostname(request))
    scenario_key = cache.find_scenario_key(session_name)
    trace = TrackTrace(handler.track, 'put_stub')
    url_args = handler.track.request_params
    err_msg = 'put/stub body format error - {0}, for session: {1}'
    try:
        stub = parse_stub(stubo_request.body_unicode, scenario_key, url_args)
    except Exception, e:
        raise exception_response(400, title=err_msg.format(e.message,
                                                           session_name))
开发者ID:rusenask,项目名称:mirage,代码行数:16,代码来源:api.py


示例16: import_bookmarks

def import_bookmarks(handler, location):
    request = handler.request
    cache = Cache(get_hostname(request)) 
    response = dict(version=version, data={})
    uri, bookmarks_name = UriLocation(request)(location)
    log.info('uri={0}, bookmarks_name={1}'.format(uri, bookmarks_name))
    payload, _, status_code = UrlFetch().get(uri)
    payload = json.loads(payload)
    # e.g payload
    #{"converse":  {"first": {"8981c0dda19403f5cc054aea758689e65db2": "2"}}} 
    imported = {}
    for scenario, bookmarks in payload.iteritems():
        for bookmark_name, bookmark in bookmarks.iteritems():
            is_new = cache.set_saved_request_index_data(scenario, bookmark_name,
                                                        bookmark)
            scenario_book = '{0}:{1}'.format(scenario, bookmark_name)
            imported[scenario_book] = ('new' if is_new else 'updated', bookmark)    
    response['data']['imported'] = imported
    return response
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:19,代码来源:api.py


示例17: export_stubs

def export_stubs(handler, scenario_name):
    cache = Cache(get_hostname(handler.request))  
    scenario_name_key = cache.scenario_key_name(scenario_name)

    exporter = Exporter(static_dir = handler.settings['static_path'])
    runnable = asbool(handler.get_argument('runnable', False))
    playback_session = handler.get_argument('playback_session', None)
    export_dir_path, files, runnable_info = exporter.export(scenario_name_key, 
                                             runnable=runnable, 
                                             playback_session=playback_session, 
               session_id=handler.get_argument('session_id', None), 
               export_dir=handler.get_argument('export_dir', None))

    links = get_export_links(handler, scenario_name_key, files)
    payload = dict(scenario=scenario_name, export_dir_path=export_dir_path,
                   links=links)
    if runnable_info:
        payload['runnable'] = runnable_info
    return dict(version=version, data=payload)
开发者ID:JohnFDavenport,项目名称:stubo-app,代码行数:19,代码来源:api.py


示例18: prepare

    def prepare(self):
        function = self.request.path
        if function.startswith("/stubo/api/"):
            function = function.partition("/stubo/api/")[-1]
        elif function == "/stubo/default/execCmds":
            # LEGACY
            function = "exec/cmds"

        args = self.skip_dotted_names(self.request.arguments)
        headers = self.skip_dotted_names(self.request.headers)
        self.track = ObjectDict(request_params=args, request_headers=headers, request_method=self.request.method)
        host_parts = self.request.host.partition(":")
        host = host_parts[0].lower()
        port = host_parts[-1]
        cache = Cache(host)
        track_setting = cache.get_stubo_setting("tracking_level")
        if not track_setting:
            track_setting = cache.get_stubo_setting("tracking_level", all_hosts=True)
        if not track_setting:
            track_setting = "normal"
        self.track.tracking_level = args.get("tracking_level", track_setting)

        request_size = len(self.request.body)
        # always track put/stub recordings
        if self.track.tracking_level == "full" or function == "put/stub":
            self.track.request_text = self.request.body
            if function == "get/response" and request_size <= 0:
                self.track.error = "NoTextInBody"

        self.track.update(
            dict(
                function=function,
                start_time=tsecs_to_date(self.request._start_time),
                host=host,
                port=port,
                remote_ip=self.request.remote_ip,
                server=socket.getfqdn(),
                request_size=request_size,
            )
        )
        log.debug("tracking: {0}:{1}".format(self.request.host, function))
开发者ID:rusenask,项目名称:mirage,代码行数:41,代码来源:track.py


示例19: get_delay_policy

def get_delay_policy(handler, name, cache_loc):
    """
    Gets specified or all delays, returns dictionary

    :param handler: RequestHandler (or TrackRequest, BaseHandler, etc..)
    :param name: Delay name, if None is passed - gets all delays
    :param cache_loc: Cache location, usually just 'master'
    :return: dictionary of dictionaries with scenario information and reference URI and status_code for response
    """
    cache = Cache(get_hostname(handler.request))
    response = {
        'version': version
    }
    status_code = 200
    delays = cache.get_delay_policy(name, cache_loc)

    # if delay policy not found but name was specified, return error message
    if delays is None and name is not None:
        status_code = 404
        response['error'] = "Delay policy '%s' not found" % name
        return response, status_code

    # list for storing delay objects since cache.get_delay_policy(name, cache_loc) returns a dict
    delay_list = []
    # adding references
    if name is None:
        if delays is not None:
            # All stored delays should be returned
            for k, v in delays.items():
                v['delayPolicyRef'] = "/stubo/api/v2/delay-policy/objects/%s" % k
                delay_list.append(v)
        else:
            # Returning empty dict
            delays = {}
            delay_list.append(delays)
    else:
        delays['delayPolicyRef'] = "/stubo/api/v2/delay-policy/objects/%s" % name
        delay_list.append(delays)

    response['data'] = delay_list
    return response, status_code
开发者ID:JohnFDavenport,项目名称:mirage,代码行数:41,代码来源:api_v2.py


示例20: jump_bookmark

def jump_bookmark(handler, name, sessions, index=None):
    request = handler.request
    cache = Cache(get_hostname(request)) 
    response = dict(version=version, data = {})
    scenario_key = cache.find_scenario_key(host, sessions[0])
    scenario_name = scenario_key.partition(':')[-1]
    if not all(cache.find_scenario_key(host, x) == scenario_key for x in sessions):
        raise exception_response(400,
          title="All sessions must belong to the same scenario")  
           
    index_state = cache.get_saved_request_index_data(scenario_name, name)
    check_bookmark(host, name, index_state)
    results = []
    for session in sessions:
        for k, v in index_state.iteritems():
            v = v if index is None else index
            session_key = '{0}:{1}'.format(session, k)
            result = set_request_index_item(scenario_name, session_key, v)
            results.append((k, v, result))  
    response['data']['results'] = results
    return response 
开发者ID:mithun-kumar,项目名称:stubo-app,代码行数:21,代码来源:api.py



注:本文中的stubo.cache.Cache类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python exceptions.exception_response函数代码示例发布时间:2022-05-27
下一篇:
Python teal.getHelpFileAsString函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap