本文整理汇总了Python中requests_cache.install_cache函数的典型用法代码示例。如果您正苦于以下问题:Python install_cache函数的具体用法?Python install_cache怎么用?Python install_cache使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了install_cache函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, cache, http_cfg):
default_cfg = dict(stream=True, timeout=30.1)
for it in default_cfg.items():
http_cfg.setdefault(*it)
self.config = DictLike(http_cfg)
if cache:
requests_cache.install_cache(**cache)
开发者ID:toobaz,项目名称:pandaSDMX,代码行数:7,代码来源:remote.py
示例2: main
def main():
global args
parser = argparse.ArgumentParser(description='从国家统计局网站下载最新的行政区')
parser.add_argument('input', const="", default="", type=str, nargs="?")
parser.add_argument("--sqlite3", type=str, help='SQLite文件位置')
parser.add_argument("--mysql", type=str, help='mysql dsn')
parser.add_argument('--mysql-host', type=str, help='mysql host')
parser.add_argument('--mysql-port', type=str, help='mysql port')
parser.add_argument('--mysql-user', type=str, help='mysql user')
parser.add_argument('--mysql-password', type=str, help='mysql password')
parser.add_argument('--mysql-database', type=str, help='mysql database')
parser.add_argument('--skip-province', type=int, help='跳过省份的第x个')
parser.add_argument('--verbose', '-v', action='count', help='打印日志内容')
parser.add_argument('--dump', action='store', default='txt', \
help='输出内容的格式 csv txt xml json jsonp')
parser.add_argument('--dump-children', action='store_true', \
help='打印子级内容')
parser.add_argument('--region-type', action='store', default='province', \
help='')
parser.add_argument('--requests-cache', action='store', \
default='/tmp/cnregion_requests_cache.sqlite')
args = parser.parse_args(sys.argv[1:])
requests_cache.install_cache(args.requests_cache)
fetch.VERBOSE_LEVEL = args.verbose
printer = Printer(args.dump)
if args.region_type == "city":
for province in fetch_provinces():
print printer.province(province)
if "__main__" == __name__:
main()
开发者ID:sayi21cn,项目名称:CNRegion,代码行数:34,代码来源:cli.py
示例3: enable_cache
def enable_cache(fileprefix, cachetype, expiry):
"""
If the requests_cache package is available, install a cache and
begin using it globally. Returns True if caching was successfully
enabled, and False otherwise (failed to enable, or enabled
already)
"""
global _CACHE_INSTALLED
if _CACHE_INSTALLED:
return False
try:
from requests_cache import install_cache
from requests_cache.core import remove_expired_responses
install_cache(fileprefix, cachetype, expire_after=expiry)
remove_expired_responses()
except ImportError:
return False
else:
_CACHE_INSTALLED = True
return True
开发者ID:obriencj,项目名称:python-gnajom,代码行数:26,代码来源:__init__.py
示例4: setUp
def setUp(self):
requests_cache.install_cache(
cache_name=os.path.join(os.path.dirname(__file__), "test"),
allowable_methods=('GET', 'POST')
)
self.ts_beg = datetime.datetime(2015, 3, 5, 0)
self.ts_end = datetime.datetime(2015, 3, 5, 3)
开发者ID:gtnx,项目名称:nova-playlist,代码行数:7,代码来源:tests.py
示例5: cmd_cymon_ip_timeline
def cmd_cymon_ip_timeline(ip, no_cache, verbose, output, pretty):
"""Simple cymon API client.
Prints the JSON result of a cymon IP timeline query.
Example:
\b
$ habu.cymon.ip.timeline 8.8.8.8
{
"timeline": [
{
"time_label": "Aug. 18, 2018",
"events": [
{
"description": "Posted: 2018-08-18 23:37:39 CEST IDS Alerts: 0 URLQuery Alerts: 1 ...",
"created": "2018-08-18T21:39:07Z",
"title": "Malicious activity reported by urlquery.net",
"details_url": "http://urlquery.net/report/b1393866-9b1f-4a8e-b02b-9636989050f3",
"tag": "malicious activity"
}
]
},
...
"""
habucfg = loadcfg()
if 'CYMON_APIKEY' not in habucfg:
print('You must provide a cymon apikey. Use the ~/.habu.json file (variable CYMON_APIKEY), or export the variable HABU_CYMON_APIKEY')
print('Get your API key from https://www.cymon.io/')
sys.exit(1)
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
if not no_cache:
homedir = pwd.getpwuid(os.getuid()).pw_dir
requests_cache.install_cache(homedir + '/.habu_requests_cache')
url = 'https://www.cymon.io:443/api/nexus/v1/ip/{}/timeline/'.format(ip)
headers = { 'Authorization': 'Token {}'.format(habucfg['CYMON_APIKEY']) }
r = requests.get(url, headers=headers)
if r.status_code not in [200, 404]:
print('ERROR', r)
return False
if r.status_code == 404:
print("Not Found")
return False
data = r.json()
if pretty:
output.write(pretty_print(data))
else:
output.write(json.dumps(data, indent=4))
output.write('\n')
开发者ID:coolsnake,项目名称:habu,代码行数:60,代码来源:cmd_cymon_ip_timeline.py
示例6: crawl_command
def crawl_command(args):
requests_cache.install_cache('builder_stats')
CBE_BASE = 'https://chrome-build-extract.appspot.com'
MASTERS_URL = 'https://chrome-infra-stats.appspot.com/_ah/api/stats/v1/masters'
master_names = requests.get(MASTERS_URL).json()['masters']
builder_stats = []
for master_name in master_names:
cbe_master_url = '%s/get_master/%s' % (CBE_BASE, master_name)
master_json = requests.get(cbe_master_url).json()
# print master_json['slaves'].keys()
for builder_name, builder_json in master_json['builders'].items():
cbe_builds_url = '%s/get_builds' % CBE_BASE
params = { 'master': master_name, 'builder': builder_name }
response_json = requests.get(cbe_builds_url, params=params).json()
builds = response_json['builds']
if builds:
finished_build = next(b for b in builds if b['eta'] is None)
first_step_name = finished_build['steps'][0]['name']
else:
first_step_name = None
builder_tuple = (master_name, builder_name, first_step_name, builder_json['slaves'])
print builder_tuple
builder_stats.append(builder_tuple)
with open('builder_stats.json', 'w') as stats_file:
json.dump(builder_stats, stats_file)
开发者ID:eseidel,项目名称:cycletimes,代码行数:29,代码来源:builder_stats.py
示例7: main
def main(argv=None):
args = parse_paasta_api_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.WARNING)
if args.soa_dir:
settings.soa_dir = args.soa_dir
# Exit on exceptions while loading settings
settings.cluster = load_system_paasta_config().get_cluster()
marathon_config = marathon_tools.load_marathon_config()
settings.marathon_client = marathon_tools.get_marathon_client(
marathon_config.get_url(),
marathon_config.get_username(),
marathon_config.get_password()
)
# Set up transparent cache for http API calls. With expire_after, responses
# are removed only when the same request is made. Expired storage is not a
# concern here. Thus remove_expired_responses is not needed.
requests_cache.install_cache("paasta-api", backend="memory", expire_after=30)
server = WSGIServer(('', int(args.port)), make_app())
log.info("paasta-api started on port %d with soa_dir %s" % (args.port, settings.soa_dir))
try:
server.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
开发者ID:gstarnberger,项目名称:paasta,代码行数:32,代码来源:api.py
示例8: __init__
def __init__(self, api_key, response_format='json'):
super(OMIM, self).__init__()
self.base_url = 'http://api.omim.org/api'
self.format = response_format
self.api_key = api_key
requests_cache.install_cache('omim_cache', backend='sqlite', expire_after=8460000)
开发者ID:Clinical-Genomics,项目名称:GeneLists,代码行数:7,代码来源:omim.py
示例9: cli
def cli(debug):
log_level = logging.INFO
requests_cache.install_cache('fr_cache', expire_after=60*60*24*3) # 3 days
if debug:
log_level = logging.DEBUG
sys.excepthook = lambda t, v, tb: ipdb.post_mortem(tb)
coloredlogs.install(level=log_level, fmt="%(levelname)s %(message)s")
开发者ID:vrajmohan,项目名称:regulations-parser,代码行数:7,代码来源:eregs.py
示例10: get_keyboard_data
def get_keyboard_data(keyboardID, weekCache=False):
"""
Get Keyboard or package data from web api.
Args:
keyboardID (str): Keyboard or package ID
weekCache (bool) : cache data for 1 week, default is 1 day
Returns:
dict: Keyboard data
"""
logging.info("Getting data for keyboard %s", keyboardID)
api_url = "https://api.keyman.com/keyboard/" + keyboardID
logging.debug("At URL %s", api_url)
home = str(Path.home())
cache_dir = keyman_cache_dir()
current_dir = os.getcwd()
if weekCache:
expire_after = datetime.timedelta(days=7)
else:
expire_after = datetime.timedelta(days=1)
os.chdir(cache_dir)
requests_cache.install_cache(cache_name='keyman_cache', backend='sqlite', expire_after=expire_after)
now = time.ctime(int(time.time()))
response = requests.get(api_url)
logging.debug("Time: {0} / Used Cache: {1}".format(now, response.from_cache))
os.chdir(current_dir)
requests_cache.core.uninstall_cache()
if response.status_code == 200:
return response.json()
else:
return None
开发者ID:tavultesoft,项目名称:keymanweb,代码行数:31,代码来源:get_kmp.py
示例11: get_api_keyboards
def get_api_keyboards(verbose=False):
"""
Get Keyboards data from web api.
Args:
verbose(bool, default False): verbose output
Returns:
dict: Keyboard data
None: if http request not successful
"""
api_url = "https://api.keyman.com/cloud/4.0/keyboards?version=10.0"
headers = {'Content-Type': 'application/json',
'Accept-Encoding': 'gzip, deflate, br'}
home = str(Path.home())
cache_dir = keyman_cache_dir()
current_dir = os.getcwd()
expire_after = datetime.timedelta(days=1)
if not os.path.isdir(cache_dir):
os.makedirs(cache_dir)
os.chdir(cache_dir)
requests_cache.install_cache(cache_name='keyman_cache', backend='sqlite', expire_after=expire_after)
now = time.ctime(int(time.time()))
response = requests.get(api_url, headers=headers)
if verbose:
print("Time: {0} / Used Cache: {1}".format(now, response.from_cache))
os.chdir(current_dir)
if response.status_code == 200:
# return json.loads(response.content.decode('utf-8'))
return response.json()
else:
return None
开发者ID:tavultesoft,项目名称:keymanweb,代码行数:31,代码来源:keymankeyboards.py
示例12: request
def request(self, method, url, params=None, headers=None, to_json=True, data=None, **kwargs):
""" Make request to TC API. """
url, params, headers, data = self.prepare(url, params, headers, data)
if self.options['cache']:
rc.install_cache(self.options['cache'])
elif type(self).cache_installed:
rc.uninstall_cache()
type(self).cache_installed = bool(self.options['cache'])
try:
response = rs.api.request(
method, url, params=params, headers=headers, data=data, **kwargs)
logger.debug(response.content)
response.raise_for_status()
if to_json:
response = response.json()
except (ValueError, rs.HTTPError):
if locals().get('response') is not None:
message = "%s: %s" % (response.status_code, response.content)
raise TCException(message)
raise
return response
开发者ID:Dipsomaniac,项目名称:ticketscloud,代码行数:28,代码来源:ticketscloud.py
示例13: setup
def setup(self):
defaults = dict(name='Matkailu- ja kongressitoimisto')
self.data_source, _ = DataSource.objects.get_or_create(id=self.name, defaults=defaults)
self.tprek_data_source = DataSource.objects.get(id='tprek')
ytj_ds, _ = DataSource.objects.get_or_create(defaults={'name': 'YTJ'}, id='ytj')
org_args = dict(origin_id='0586977-6', data_source=ytj_ds)
defaults = dict(name='Helsingin Markkinointi Oy')
self.organization, _ = Organization.objects.get_or_create(
defaults=defaults, **org_args)
place_list = Place.objects.filter(data_source=self.tprek_data_source, deleted=False)
deleted_place_list = Place.objects.filter(data_source=self.tprek_data_source,
deleted=True)
# Get only places that have unique names
place_list = place_list.annotate(count=Count('name_fi')).filter(count=1).values('id', 'origin_id', 'name_fi')
deleted_place_list = deleted_place_list.annotate(count=Count('name_fi')).\
filter(count=1).values('id', 'origin_id', 'name_fi', 'replaced_by_id')
self.tprek_by_name = {p['name_fi'].lower(): (p['id'], p['origin_id']) for p in place_list}
self.deleted_tprek_by_name = {
p['name_fi'].lower(): (p['id'], p['origin_id'], p['replaced_by_id'])
for p in deleted_place_list}
if self.options['cached']:
requests_cache.install_cache('matko')
开发者ID:City-of-Helsinki,项目名称:linkedevents,代码行数:27,代码来源:matko.py
示例14: test_expire_after_installed
def test_expire_after_installed(self):
requests_cache.install_cache(name=CACHE_NAME, backend=CACHE_BACKEND)
requests_cache.expire_after('http://httpbin.org/get', 2)
r = requests.get('http://httpbin.org/get')
self.assertFalse(r.from_cache)
r = requests.get('http://httpbin.org/get')
self.assertTrue(r.from_cache)
开发者ID:jherre,项目名称:requests-cache,代码行数:7,代码来源:test_monkey_patch.py
示例15: install_cache
def install_cache(expire_after=12 * 3600):
"""
Patches the requests library with requests_cache.
"""
requests_cache.install_cache(
expire_after=expire_after,
allowable_methods=('GET',))
开发者ID:kumar303,项目名称:data-services-helpers,代码行数:7,代码来源:dshelpers.py
示例16: cmd_crtsh
def cmd_crtsh(domain, no_cache, no_validate, verbose):
"""Downloads the certificate transparency logs for a domain
and check with DNS queries if each subdomain exists.
Uses multithreading to improve the performance of the DNS queries.
Example:
\b
$ sudo habu.crtsh securetia.com
[
"karma.securetia.com.",
"www.securetia.com."
]
"""
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
if not no_cache:
homedir = pwd.getpwuid(os.getuid()).pw_dir
requests_cache.install_cache(homedir + '/.habu_requests_cache', expire_after=3600)
subdomains = set()
if verbose:
print("Downloading subdomain list from https://crt.sh ...", file=sys.stderr)
req = requests.get("https://crt.sh/?q=%.{d}&output=json".format(d=domain))
if req.status_code != 200:
print("[X] Information not available!")
exit(1)
json_data = json.loads(req.text)
for data in json_data:
name = data['name_value'].lower()
if '*' not in name:
subdomains.add(name)
subdomains = list(subdomains)
if no_validate:
print(json.dumps(sorted(subdomains), indent=4))
return True
if verbose:
print("Validating subdomains against DNS servers ...", file=sys.stderr)
answers = query_bulk(subdomains)
validated = []
for answer in answers:
if answer:
validated.append(str(answer.qname))
print(json.dumps(sorted(validated), indent=4))
return True
开发者ID:portantier,项目名称:habu,代码行数:60,代码来源:cmd_crtsh.py
示例17: handle
def handle(self, *args, **options):
if options['cached']:
requests_cache.install_cache('resources_import')
importers = get_importers()
imp_list = ', '.join(sorted(importers.keys()))
imp_name = options.get('module')
if not imp_name:
raise CommandError("Enter the name of the importer module. Valid importers: %s" % imp_list)
if imp_name not in importers:
raise CommandError("Importer %s not found. Valid importers: %s" % (args[0], imp_list))
imp_class = importers[imp_name]
importer = imp_class(options)
# Activate the default language for the duration of the import
# to make sure translated fields are populated correctly.
default_language = settings.LANGUAGES[0][0]
for imp_type in self.importer_types:
name = "import_%s" % imp_type
method = getattr(importer, name, None)
if options[imp_type]:
if not method:
raise CommandError("Importer %s does not support importing %s" % (name, imp_type))
else:
if not options['all']:
continue
if method:
with override(default_language), transaction.atomic():
kwargs = {}
url = options.pop('url', None)
if url:
kwargs['url'] = url
method(**kwargs)
开发者ID:City-of-Helsinki,项目名称:respa,代码行数:34,代码来源:resources_import.py
示例18: test_fred
def test_fred():
filename = "fred"
if expire_after>=0:
requests_cache.install_cache(filename, backend='sqlite', expire_after=expire_after) # expiration seconds
logging.info("Installing cache '%s.sqlite' with expire_after=%d (seconds)" % (filename, expire_after))
if expire_after==0:
logging.warning("expire_after==0 no cache expiration!")
start = datetime.datetime(2010, 1, 1)
end = datetime.datetime(2013, 1, 27)
#name = "GDP"
#name = "CPIAUCSL"
#name = "CPILFESL"
name = ["CPIAUCSL", "CPILFESL"]
#name = ["CPIAUCSL", "CPILFESL", "ERROR"]
data = MyDataReader("FRED").get(name, start, end)
print(data)
gdp = web.DataReader(name, "fred", start, end)
print(gdp)
print(type(gdp))
print(gdp.ix['2013-01-01'])
print(gdp.dtypes)
diff = gdp - data
assert(diff.sum().sum()==0)
开发者ID:gitter-badger,项目名称:pandas_datareaders,代码行数:32,代码来源:test_datareader_fred.py
示例19: install_cache_requests
def install_cache_requests():
requests_cache.install_cache(**{
'allowable_methods': ('GET', 'HEAD'),
'cache_name': conf.REQUESTS_CACHE,
'backend': 'sqlite',
'fast_save': conf.ASYNC_CACHE_WRITES,
'extension': '.sqlite3'})
开发者ID:elifesciences,项目名称:bot-lax-adaptor,代码行数:7,代码来源:cache_requests.py
示例20: get_vhosts
def get_vhosts(ip, first=1, no_cache=False):
"""Returns a list of webs hosted on IP (checks bing.com)
>>> 'www.bing.com' in vhosts(204.79.197.200)
True
"""
if not no_cache:
homedir = pwd.getpwuid(os.getuid()).pw_dir
requests_cache.install_cache(homedir + '/.habu_requests_cache')
url = "http://www.bing.com/search?q=ip:{ip}&first={first}".format(ip=ip, first=first)
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
vhosts = set()
for h2 in soup.find_all('h2'):
for link in h2.find_all('a'):
href = link.get('href')
if href.startswith('http://') or href.startswith('https://'):
vhost = href.split('/')[2]
vhosts.add(vhost)
return list(vhosts)
开发者ID:coolsnake,项目名称:habu,代码行数:27,代码来源:vhosts.py
注:本文中的requests_cache.install_cache函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论