• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python requests_cache.install_cache函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中requests_cache.install_cache函数的典型用法代码示例。如果您正苦于以下问题:Python install_cache函数的具体用法?Python install_cache怎么用?Python install_cache使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了install_cache函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

 def __init__(self, cache, http_cfg):
     default_cfg = dict(stream=True, timeout=30.1)
     for it in default_cfg.items():
         http_cfg.setdefault(*it)
     self.config = DictLike(http_cfg)
     if cache:
         requests_cache.install_cache(**cache)
开发者ID:toobaz,项目名称:pandaSDMX,代码行数:7,代码来源:remote.py


示例2: main

def main():
    global args
    parser = argparse.ArgumentParser(description='从国家统计局网站下载最新的行政区')
    parser.add_argument('input', const="", default="", type=str, nargs="?")
    parser.add_argument("--sqlite3", type=str, help='SQLite文件位置')
    parser.add_argument("--mysql", type=str, help='mysql dsn')
    parser.add_argument('--mysql-host', type=str, help='mysql host')
    parser.add_argument('--mysql-port', type=str, help='mysql port')
    parser.add_argument('--mysql-user', type=str, help='mysql user')
    parser.add_argument('--mysql-password', type=str, help='mysql password')
    parser.add_argument('--mysql-database', type=str, help='mysql database')
    parser.add_argument('--skip-province', type=int, help='跳过省份的第x个')
    parser.add_argument('--verbose', '-v', action='count', help='打印日志内容')
    parser.add_argument('--dump', action='store', default='txt', \
        help='输出内容的格式 csv txt xml json jsonp')
    parser.add_argument('--dump-children', action='store_true', \
        help='打印子级内容')
    parser.add_argument('--region-type', action='store', default='province', \
        help='')
    parser.add_argument('--requests-cache', action='store', \
        default='/tmp/cnregion_requests_cache.sqlite')

    args = parser.parse_args(sys.argv[1:])
    requests_cache.install_cache(args.requests_cache)
    fetch.VERBOSE_LEVEL = args.verbose

    printer = Printer(args.dump)

    if args.region_type == "city":
    for province in fetch_provinces():
        print printer.province(province)

if "__main__" == __name__:
    main()
开发者ID:sayi21cn,项目名称:CNRegion,代码行数:34,代码来源:cli.py


示例3: enable_cache

def enable_cache(fileprefix, cachetype, expiry):
    """
    If the requests_cache package is available, install a cache and
    begin using it globally. Returns True if caching was successfully
    enabled, and False otherwise (failed to enable, or enabled
    already)
    """

    global _CACHE_INSTALLED

    if _CACHE_INSTALLED:
        return False

    try:
        from requests_cache import install_cache
        from requests_cache.core import remove_expired_responses

        install_cache(fileprefix, cachetype, expire_after=expiry)
        remove_expired_responses()

    except ImportError:
        return False

    else:
        _CACHE_INSTALLED = True
        return True
开发者ID:obriencj,项目名称:python-gnajom,代码行数:26,代码来源:__init__.py


示例4: setUp

 def setUp(self):
     requests_cache.install_cache(
         cache_name=os.path.join(os.path.dirname(__file__), "test"),
         allowable_methods=('GET', 'POST')
     )
     self.ts_beg = datetime.datetime(2015, 3, 5, 0)
     self.ts_end = datetime.datetime(2015, 3, 5, 3)
开发者ID:gtnx,项目名称:nova-playlist,代码行数:7,代码来源:tests.py


示例5: cmd_cymon_ip_timeline

def cmd_cymon_ip_timeline(ip, no_cache, verbose, output, pretty):
    """Simple cymon API client.

    Prints the JSON result of a cymon IP timeline query.

    Example:

    \b
    $ habu.cymon.ip.timeline 8.8.8.8
    {
        "timeline": [
            {
                "time_label": "Aug. 18, 2018",
                "events": [
                    {
                        "description": "Posted: 2018-08-18 23:37:39 CEST IDS Alerts: 0 URLQuery Alerts: 1 ...",
                        "created": "2018-08-18T21:39:07Z",
                        "title": "Malicious activity reported by urlquery.net",
                        "details_url": "http://urlquery.net/report/b1393866-9b1f-4a8e-b02b-9636989050f3",
                        "tag": "malicious activity"
                    }
                ]
            },
            ...
    """

    habucfg = loadcfg()

    if 'CYMON_APIKEY' not in habucfg:
        print('You must provide a cymon apikey. Use the ~/.habu.json file (variable CYMON_APIKEY), or export the variable HABU_CYMON_APIKEY')
        print('Get your API key from https://www.cymon.io/')
        sys.exit(1)

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if not no_cache:
        homedir = pwd.getpwuid(os.getuid()).pw_dir
        requests_cache.install_cache(homedir + '/.habu_requests_cache')

    url = 'https://www.cymon.io:443/api/nexus/v1/ip/{}/timeline/'.format(ip)
    headers = { 'Authorization': 'Token {}'.format(habucfg['CYMON_APIKEY']) }

    r = requests.get(url, headers=headers)

    if r.status_code not in [200, 404]:
        print('ERROR', r)
        return False

    if r.status_code == 404:
        print("Not Found")
        return False

    data = r.json()

    if pretty:
        output.write(pretty_print(data))
    else:
        output.write(json.dumps(data, indent=4))
        output.write('\n')
开发者ID:coolsnake,项目名称:habu,代码行数:60,代码来源:cmd_cymon_ip_timeline.py


示例6: crawl_command

def crawl_command(args):
    requests_cache.install_cache('builder_stats')

    CBE_BASE = 'https://chrome-build-extract.appspot.com'
    MASTERS_URL = 'https://chrome-infra-stats.appspot.com/_ah/api/stats/v1/masters'
    master_names = requests.get(MASTERS_URL).json()['masters']

    builder_stats = []

    for master_name in master_names:
        cbe_master_url = '%s/get_master/%s' % (CBE_BASE, master_name)
        master_json = requests.get(cbe_master_url).json()
        # print master_json['slaves'].keys()
        for builder_name, builder_json in master_json['builders'].items():
            cbe_builds_url = '%s/get_builds' % CBE_BASE
            params = { 'master': master_name, 'builder': builder_name }
            response_json = requests.get(cbe_builds_url, params=params).json()
            builds = response_json['builds']
            if builds:
                finished_build = next(b for b in builds if b['eta'] is None)
                first_step_name = finished_build['steps'][0]['name']
            else:
                first_step_name = None
            builder_tuple = (master_name, builder_name, first_step_name, builder_json['slaves'])
            print builder_tuple
            builder_stats.append(builder_tuple)

    with open('builder_stats.json', 'w') as stats_file:
        json.dump(builder_stats, stats_file)
开发者ID:eseidel,项目名称:cycletimes,代码行数:29,代码来源:builder_stats.py


示例7: main

def main(argv=None):
    args = parse_paasta_api_args()
    if args.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.WARNING)

    if args.soa_dir:
        settings.soa_dir = args.soa_dir

    # Exit on exceptions while loading settings
    settings.cluster = load_system_paasta_config().get_cluster()

    marathon_config = marathon_tools.load_marathon_config()
    settings.marathon_client = marathon_tools.get_marathon_client(
        marathon_config.get_url(),
        marathon_config.get_username(),
        marathon_config.get_password()
    )

    # Set up transparent cache for http API calls. With expire_after, responses
    # are removed only when the same request is made. Expired storage is not a
    # concern here. Thus remove_expired_responses is not needed.
    requests_cache.install_cache("paasta-api", backend="memory", expire_after=30)

    server = WSGIServer(('', int(args.port)), make_app())
    log.info("paasta-api started on port %d with soa_dir %s" % (args.port, settings.soa_dir))

    try:
        server.serve_forever()
    except KeyboardInterrupt:
        sys.exit(0)
开发者ID:gstarnberger,项目名称:paasta,代码行数:32,代码来源:api.py


示例8: __init__

  def __init__(self, api_key, response_format='json'):
    super(OMIM, self).__init__()
    self.base_url = 'http://api.omim.org/api'
    self.format = response_format
    self.api_key = api_key

    requests_cache.install_cache('omim_cache', backend='sqlite', expire_after=8460000)
开发者ID:Clinical-Genomics,项目名称:GeneLists,代码行数:7,代码来源:omim.py


示例9: cli

def cli(debug):
    log_level = logging.INFO
    requests_cache.install_cache('fr_cache', expire_after=60*60*24*3)  # 3 days
    if debug:
        log_level = logging.DEBUG
        sys.excepthook = lambda t, v, tb: ipdb.post_mortem(tb)
    coloredlogs.install(level=log_level, fmt="%(levelname)s %(message)s")
开发者ID:vrajmohan,项目名称:regulations-parser,代码行数:7,代码来源:eregs.py


示例10: get_keyboard_data

def get_keyboard_data(keyboardID, weekCache=False):
	"""
	Get Keyboard or package data from web api.

	Args:
		keyboardID (str): Keyboard or package ID
		weekCache (bool) : cache data for 1 week, default is 1 day
	Returns:
		dict: Keyboard data
	"""
	logging.info("Getting data for keyboard %s", keyboardID)
	api_url = "https://api.keyman.com/keyboard/" + keyboardID
	logging.debug("At URL %s", api_url)
	home = str(Path.home())
	cache_dir = keyman_cache_dir()
	current_dir = os.getcwd()
	if weekCache:
		expire_after = datetime.timedelta(days=7)
	else:
		expire_after = datetime.timedelta(days=1)
	os.chdir(cache_dir)
	requests_cache.install_cache(cache_name='keyman_cache', backend='sqlite', expire_after=expire_after)
	now = time.ctime(int(time.time()))
	response = requests.get(api_url)
	logging.debug("Time: {0} / Used Cache: {1}".format(now, response.from_cache))
	os.chdir(current_dir)
	requests_cache.core.uninstall_cache()
	if response.status_code == 200:
		return response.json()
	else:
		return None
开发者ID:tavultesoft,项目名称:keymanweb,代码行数:31,代码来源:get_kmp.py


示例11: get_api_keyboards

def get_api_keyboards(verbose=False):
	"""
	Get Keyboards data from web api.

	Args:
		verbose(bool, default False): verbose output
	Returns:
		dict: Keyboard data
		None: if http request not successful
	"""
	api_url = "https://api.keyman.com/cloud/4.0/keyboards?version=10.0"
	headers = {'Content-Type': 'application/json',
		'Accept-Encoding': 'gzip, deflate, br'}
	home = str(Path.home())
	cache_dir = keyman_cache_dir()
	current_dir = os.getcwd()
	expire_after = datetime.timedelta(days=1)
	if not os.path.isdir(cache_dir):
		os.makedirs(cache_dir)
	os.chdir(cache_dir)
	requests_cache.install_cache(cache_name='keyman_cache', backend='sqlite', expire_after=expire_after)
	now = time.ctime(int(time.time()))
	response = requests.get(api_url, headers=headers)
	if verbose:
		print("Time: {0} / Used Cache: {1}".format(now, response.from_cache))
	os.chdir(current_dir)
	if response.status_code == 200:
#		return json.loads(response.content.decode('utf-8'))
		return response.json()
	else:
		return None
开发者ID:tavultesoft,项目名称:keymanweb,代码行数:31,代码来源:keymankeyboards.py


示例12: request

    def request(self, method, url, params=None, headers=None, to_json=True, data=None, **kwargs):
        """ Make request to TC API. """

        url, params, headers, data = self.prepare(url, params, headers, data)

        if self.options['cache']:
            rc.install_cache(self.options['cache'])

        elif type(self).cache_installed:
            rc.uninstall_cache()

        type(self).cache_installed = bool(self.options['cache'])

        try:
            response = rs.api.request(
                method, url, params=params, headers=headers, data=data, **kwargs)
            logger.debug(response.content)
            response.raise_for_status()
            if to_json:
                response = response.json()

        except (ValueError, rs.HTTPError):
            if locals().get('response') is not None:
                message = "%s: %s" % (response.status_code, response.content)
                raise TCException(message)
            raise

        return response
开发者ID:Dipsomaniac,项目名称:ticketscloud,代码行数:28,代码来源:ticketscloud.py


示例13: setup

    def setup(self):
        defaults = dict(name='Matkailu- ja kongressitoimisto')
        self.data_source, _ = DataSource.objects.get_or_create(id=self.name, defaults=defaults)
        self.tprek_data_source = DataSource.objects.get(id='tprek')

        ytj_ds, _ = DataSource.objects.get_or_create(defaults={'name': 'YTJ'}, id='ytj')

        org_args = dict(origin_id='0586977-6', data_source=ytj_ds)
        defaults = dict(name='Helsingin Markkinointi Oy')

        self.organization, _ = Organization.objects.get_or_create(
            defaults=defaults, **org_args)

        place_list = Place.objects.filter(data_source=self.tprek_data_source, deleted=False)
        deleted_place_list = Place.objects.filter(data_source=self.tprek_data_source,
                                                  deleted=True)
        # Get only places that have unique names
        place_list = place_list.annotate(count=Count('name_fi')).filter(count=1).values('id', 'origin_id', 'name_fi')
        deleted_place_list = deleted_place_list.annotate(count=Count('name_fi')).\
            filter(count=1).values('id', 'origin_id', 'name_fi', 'replaced_by_id')
        self.tprek_by_name = {p['name_fi'].lower(): (p['id'], p['origin_id']) for p in place_list}
        self.deleted_tprek_by_name = {
            p['name_fi'].lower(): (p['id'], p['origin_id'], p['replaced_by_id'])
            for p in deleted_place_list}

        if self.options['cached']:
            requests_cache.install_cache('matko')
开发者ID:City-of-Helsinki,项目名称:linkedevents,代码行数:27,代码来源:matko.py


示例14: test_expire_after_installed

 def test_expire_after_installed(self):
     requests_cache.install_cache(name=CACHE_NAME, backend=CACHE_BACKEND)
     requests_cache.expire_after('http://httpbin.org/get', 2)        
     r = requests.get('http://httpbin.org/get')
     self.assertFalse(r.from_cache)
     r = requests.get('http://httpbin.org/get')
     self.assertTrue(r.from_cache)
开发者ID:jherre,项目名称:requests-cache,代码行数:7,代码来源:test_monkey_patch.py


示例15: install_cache

def install_cache(expire_after=12 * 3600):
    """
    Patches the requests library with requests_cache.
    """
    requests_cache.install_cache(
        expire_after=expire_after,
        allowable_methods=('GET',))
开发者ID:kumar303,项目名称:data-services-helpers,代码行数:7,代码来源:dshelpers.py


示例16: cmd_crtsh

def cmd_crtsh(domain, no_cache, no_validate, verbose):
    """Downloads the certificate transparency logs for a domain
    and check with DNS queries if each subdomain exists.

    Uses multithreading to improve the performance of the DNS queries.

    Example:

    \b
    $ sudo habu.crtsh securetia.com
    [
        "karma.securetia.com.",
        "www.securetia.com."
    ]
    """

    if verbose:
        logging.basicConfig(level=logging.INFO, format='%(message)s')

    if not no_cache:
        homedir = pwd.getpwuid(os.getuid()).pw_dir
        requests_cache.install_cache(homedir + '/.habu_requests_cache', expire_after=3600)

    subdomains = set()

    if verbose:
        print("Downloading subdomain list from https://crt.sh ...", file=sys.stderr)

    req = requests.get("https://crt.sh/?q=%.{d}&output=json".format(d=domain))

    if req.status_code != 200:
        print("[X] Information not available!")
        exit(1)

    json_data = json.loads(req.text)

    for data in json_data:
        name = data['name_value'].lower()
        if '*' not in name:
            subdomains.add(name)

    subdomains = list(subdomains)

    if no_validate:
        print(json.dumps(sorted(subdomains), indent=4))
        return True

    if verbose:
        print("Validating subdomains against DNS servers ...", file=sys.stderr)

    answers = query_bulk(subdomains)

    validated = []

    for answer in answers:
        if answer:
            validated.append(str(answer.qname))

    print(json.dumps(sorted(validated), indent=4))
    return True
开发者ID:portantier,项目名称:habu,代码行数:60,代码来源:cmd_crtsh.py


示例17: handle

    def handle(self, *args, **options):
        if options['cached']:
            requests_cache.install_cache('resources_import')

        importers = get_importers()
        imp_list = ', '.join(sorted(importers.keys()))
        imp_name = options.get('module')
        if not imp_name:
            raise CommandError("Enter the name of the importer module. Valid importers: %s" % imp_list)
        if imp_name not in importers:
            raise CommandError("Importer %s not found. Valid importers: %s" % (args[0], imp_list))
        imp_class = importers[imp_name]
        importer = imp_class(options)

        # Activate the default language for the duration of the import
        # to make sure translated fields are populated correctly.
        default_language = settings.LANGUAGES[0][0]
        for imp_type in self.importer_types:
            name = "import_%s" % imp_type
            method = getattr(importer, name, None)
            if options[imp_type]:
                if not method:
                    raise CommandError("Importer %s does not support importing %s" % (name, imp_type))
            else:
                if not options['all']:
                    continue

            if method:
                with override(default_language), transaction.atomic():
                    kwargs = {}
                    url = options.pop('url', None)
                    if url:
                        kwargs['url'] = url
                    method(**kwargs)
开发者ID:City-of-Helsinki,项目名称:respa,代码行数:34,代码来源:resources_import.py


示例18: test_fred

def test_fred():

    filename = "fred"

    if expire_after>=0:
        requests_cache.install_cache(filename, backend='sqlite', expire_after=expire_after) # expiration seconds
        logging.info("Installing cache '%s.sqlite' with expire_after=%d (seconds)" % (filename, expire_after))
    if expire_after==0:
        logging.warning("expire_after==0 no cache expiration!")

    start = datetime.datetime(2010, 1, 1)
    end = datetime.datetime(2013, 1, 27)

    #name = "GDP"
    #name = "CPIAUCSL"
    #name = "CPILFESL"
    name = ["CPIAUCSL", "CPILFESL"]
    #name = ["CPIAUCSL", "CPILFESL", "ERROR"]


    data = MyDataReader("FRED").get(name, start, end)
    print(data)

    gdp = web.DataReader(name, "fred", start, end)

    print(gdp)
    print(type(gdp))
    print(gdp.ix['2013-01-01'])
    print(gdp.dtypes)

    diff = gdp - data
    assert(diff.sum().sum()==0)
开发者ID:gitter-badger,项目名称:pandas_datareaders,代码行数:32,代码来源:test_datareader_fred.py


示例19: install_cache_requests

def install_cache_requests():
    requests_cache.install_cache(**{
        'allowable_methods': ('GET', 'HEAD'),
        'cache_name': conf.REQUESTS_CACHE,
        'backend': 'sqlite',
        'fast_save': conf.ASYNC_CACHE_WRITES,
        'extension': '.sqlite3'})
开发者ID:elifesciences,项目名称:bot-lax-adaptor,代码行数:7,代码来源:cache_requests.py


示例20: get_vhosts

def get_vhosts(ip, first=1, no_cache=False):
    """Returns a list of webs hosted on IP (checks bing.com)
    >>> 'www.bing.com' in vhosts(204.79.197.200)
    True
    """

    if not no_cache:
        homedir = pwd.getpwuid(os.getuid()).pw_dir
        requests_cache.install_cache(homedir + '/.habu_requests_cache')

    url = "http://www.bing.com/search?q=ip:{ip}&first={first}".format(ip=ip, first=first)

    response = requests.get(url)

    soup = BeautifulSoup(response.text, "html.parser")

    vhosts = set()

    for h2 in soup.find_all('h2'):
        for link in h2.find_all('a'):
            href = link.get('href')

            if href.startswith('http://') or href.startswith('https://'):
                vhost = href.split('/')[2]
                vhosts.add(vhost)

    return list(vhosts)
开发者ID:coolsnake,项目名称:habu,代码行数:27,代码来源:vhosts.py



注:本文中的requests_cache.install_cache函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python requests_cache.CachedSession类代码示例发布时间:2022-05-26
下一篇:
Python requests_cache.disabled函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap