本文整理汇总了Python中requests.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_status_raising
def test_status_raising(self):
r = requests.get(httpbin("status", "404"))
with pytest.raises(requests.exceptions.HTTPError):
r.raise_for_status()
r = requests.get(httpbin("status", "500"))
assert not r.ok
开发者ID:RRedwards,项目名称:requests,代码行数:7,代码来源:test_requests.py
示例2: test_connection_error
def test_connection_error(self):
"""Connecting to an unknown domain should raise a ConnectionError"""
with pytest.raises(ConnectionError):
requests.get("http://fooobarbangbazbing.httpbin.org")
with pytest.raises(ConnectionError):
requests.get("http://httpbin.org:1")
开发者ID:RRedwards,项目名称:requests,代码行数:7,代码来源:test_requests.py
示例3: sources
def sources(self, url, hostDict, hostprDict):
try:
sources = []
if url == None: return sources
year = url['year']
h = {'User-Agent': client.randomagent()}
title = cleantitle.geturl(url['title']).replace('-', '+')
url = urlparse.urljoin(self.base_link, self.search_link % title)
r = requests.get(url, headers=h)
r = BeautifulSoup(r.text, 'html.parser').find('div', {'class': 'item'})
r = r.find('a')['href']
r = requests.get(r, headers=h)
r = BeautifulSoup(r.content, 'html.parser')
quality = r.find('span', {'class': 'calidad2'}).text
url = r.find('div', {'class':'movieplay'}).find('iframe')['src']
if not quality in ['1080p', '720p']:
quality = 'SD'
valid, host = source_utils.is_host_valid(url, hostDict)
sources.append({'source': host, 'quality': quality, 'language': 'en', 'url': url, 'direct': False, 'debridonly': False})
return sources
except:
print("Unexpected error in Furk Script: check_api", sys.exc_info()[0])
exc_type, exc_obj, exc_tb = sys.exc_info()
print(exc_type, exc_tb.tb_lineno)
return sources
开发者ID:varunrai,项目名称:repository.magicality,代码行数:28,代码来源:movie4kis.py
示例4: test_where_3_way_logic
def test_where_3_way_logic(self):
# This column
# ⇓
# ___________________
# | | col1 | col2 |
# -------------------
# | r1 | 1 | | ⇐ This row
# -------------------
# | r2 | 1 | 2 |
# -------------------
response = requests.get(self.dataset_url)
datasets = json.loads(response.content)
self.assertEqual(response.status_code, 200,
msg="Couldn't get the list of datasets")
self.assertEqual(datasets['status']['rowCount'], 2,
msg="2 rows should be in the dataset. r1 and r2")
params = {
"select":'col1',
"where":"'col2' < 2"
}
response = requests.get(self.dataset_url + '/query', params=params)
content = json.loads(response.content)
self.assertEqual(len(content), 0,
msg="The query should have returned no results")
开发者ID:BenjaminYu,项目名称:mldb,代码行数:27,代码来源:null_column_test.py
示例5: test_max_timeout
def test_max_timeout(self):
with SplashServer(extra_args=['--max-timeout=0.1']) as splash:
r1 = requests.get(
url=splash.url("render.html"),
params={
'url': self.mockurl("delay?n=1"),
'timeout': '0.2',
},
)
self.assertStatusCode(r1, 400)
r2 = requests.get(
url=splash.url("render.html"),
params={
'url': self.mockurl("delay?n=1"),
'timeout': '0.1',
},
)
self.assertStatusCode(r2, 504)
r3 = requests.get(
url=splash.url("render.html"),
params={
'url': self.mockurl("delay?n=1")
},
)
self.assertStatusCode(r3, 504)
r4 = requests.get(
url=splash.url("render.html"),
params={
'url': self.mockurl("")
},
)
self.assertStatusCode(r4, 200)
开发者ID:smarthomekit,项目名称:splash,代码行数:35,代码来源:test_render.py
示例6: _request_odl_data
def _request_odl_data(self, host, node_connector_list, flow_statistics_list):
# Data to export from OpenDaylight.
data_dict = {}
try:
# Flow table statistics per host (eg. opendaylight, compute, control and neutron)
try:
table_flow_statistics_url = "%s%s%s%s" % (self._odl_inventory_url,'/node/',self.hosts_dict[host],'/table/0/opendaylight-flow-table-statistics:flow-table-statistics')
table_flow_statistics = requests.get(table_flow_statistics_url)
table_flow_statistics.raise_for_status()
data_dict["table_flow_statistics"] = table_flow_statistics
except requests.exceptions.HTTPError as err:
print "Can not retrieve flow table statistics:", err
# Aggregate flow statistics per host (eg. opendaylight, compute, control and neutron)
try:
aggregate_flow_statistics_url = "%s%s%s%s" % (self._odl_inventory_url,'/node/',self.hosts_dict[host],'/table/0/aggregate-flow-statistics/')
aggregate_flow_statistics = requests.get(aggregate_flow_statistics_url)
aggregate_flow_statistics.raise_for_status()
data_dict["aggregate_flow_statistics"] = aggregate_flow_statistics
except requests.exceptions.HTTPError as err:
pass
#print "Can not retrieve aggregate flow statistics:", err
# Individual flow statistics per host (eg. opendaylight, compute, control and neutron)
data_dict["flow_statistics_list"] = flow_statistics_list
# Port statistics per host (eg. opendaylight, compute, control and neutron)
data_dict["node_connector_list"] = node_connector_list
return data_dict
except ConnectionError:
print("Error fetching data from OpenDaylight.")
开发者ID:icclab,项目名称:opendaylight-prometheus-exporter,代码行数:34,代码来源:opendaylight-prometheus-exporter.py
示例7: updateCCU
def updateCCU(v):
ccuUrl = "http://pi:8080/api/set"
try:
requests.get(ccuUrl + "/AussenTemp/?value=" + str(v.get('outside_temp')))
requests.get(ccuUrl + "/KollectorTemp/?value=" + str(v.get('collector_temp')))
except Exception,e:
logError(e)
开发者ID:franke1276,项目名称:heatpump,代码行数:7,代码来源:heatpumpMonitor.py
示例8: login_success
def login_success(token, profile):
if profile['email'] in allowed_users:
return render_template('home.html')
#return jsonify(token=token, profile=profile)
else:
requests.get('https://accounts.google.com/o/oauth2/revoke?token='+token['access_token'])
return """
开发者ID:ybrodskiy,项目名称:twilio-pager,代码行数:7,代码来源:pager.py
示例9: get_all_messages
def get_all_messages(self):
print "Reading messages..."
r = requests.get("https://api.groupme.com/v3/groups/"
+ self.gid + "/messages",
params = {"token": self.key, "limit": 100})
message_count = r.json()["response"]["count"]
i = 0
out = []
while r.status_code is 200 and i < message_count:
progress(i, message_count)
resp = r.json()["response"]
messages = resp["messages"]
for message in messages:
if message["system"] or message["text"] is None:
continue
if message["sender_type"] == u'bot':
continue
# ignore bot commands
if message["text"].startswith("/bot"):
continue
out += [message]
i += len(messages)
last_id = messages[-1]["id"]
r = requests.get("https://api.groupme.com/v3/groups/"
+ self.gid + "/messages",
params = {"token": self.key, "limit": 100, "before_id": last_id})
return out
开发者ID:rohan,项目名称:groupme-markov,代码行数:34,代码来源:bot.py
示例10: run
def run():
responses.add(
responses.GET, 'http://example.com/?test=1',
match_querystring=True)
with pytest.raises(ConnectionError):
requests.get('http://example.com/foo/?test=2')
开发者ID:jboning,项目名称:responses,代码行数:7,代码来源:test_responses.py
示例11: main
def main():
soup = BeautifulSoup(requests.get('https://news.ycombinator.com/news').content)
links=soup.find_all('span', attrs={'class':'deadmark'})
for link in links:
webpage = link.next_sibling.get('href')
content = BeautifulSoup(requests.get(webpage).content).body.get_text()
printf((webpage, hls().get(content)))
开发者ID:adi69,项目名称:sexy-snippets,代码行数:7,代码来源:doit.py
示例12: do_api_request
def do_api_request(api_link, method='GET', params={}):
# add sendgrid user & api key
params.update({
'api_user': settings.get('sendgrid_user'),
'api_key': settings.get('sendgrid_secret')
})
try:
if method.upper() == 'GET':
if len(params.keys()) > 0:
r = requests.get(
api_link,
params=params,
verify=False
)
else:
r = requests.get(
api_link,
verify=False
)
else:
r = requests.post(
api_link,
params=params,
verify=False
)
response = r.json()
except:
response = {}
if settings.get('environment') == "dev":
logging.info("=================")
logging.info( api_link)
logging.info( json.dumps(params, indent=4))
logging.info( response)
logging.info( "=================")
return response
开发者ID:proteusvacuum,项目名称:theconversation,代码行数:35,代码来源:emailsdb.py
示例13: deploy_test_app_and_check
def deploy_test_app_and_check(self, app, test_uuid):
"""This method deploys the test server app and then
pings its /operating_environment endpoint to retrieve the container
user running the task.
In a mesos container, this will be the marathon user
In a docker container this user comes from the USER setting
from the app's Dockerfile, which, for the test application
is the default, root
"""
if 'container' in app and app['container']['type'] == 'DOCKER':
marathon_user = 'root'
else:
marathon_user = app.get('user', self.default_os_user)
with self.deploy_and_cleanup(app) as service_points:
r = requests.get('http://{}:{}/test_uuid'.format(service_points[0].host, service_points[0].port))
if r.status_code != 200:
msg = "Test server replied with non-200 reply: '{0} {1}. "
msg += "Detailed explanation of the problem: {2}"
raise Exception(msg.format(r.status_code, r.reason, r.text))
r_data = r.json()
assert r_data['test_uuid'] == test_uuid
r = requests.get('http://{}:{}/operating_environment'.format(
service_points[0].host,
service_points[0].port))
if r.status_code != 200:
msg = "Test server replied with non-200 reply: '{0} {1}. "
msg += "Detailed explanation of the problem: {2}"
raise Exception(msg.format(r.status_code, r.reason, r.text))
assert r.json() == {'username': marathon_user}
开发者ID:cmaloney,项目名称:dcos,代码行数:35,代码来源:marathon.py
示例14: hopcam
def hopcam():
# Should return a list with revmic's dailymotion videos entitled 'yesterday'
yda_uri = 'https://api.dailymotion.com/videos?owners=revmic&' \
'search=yesterday&fields=id,title,description,embed_url,' \
'thumbnail_480_url,views_total'
r = requests.get(yda_uri)
print(r.json())
try:
# Get last item in list in case there are multiples (prev delete failed)
yda_video = get_list(r)[-1]
except IndexError as e:
yda_video = {'title': "Sorry. Couldn't find yesterday's video :'("}
print('IndexError - ', e)
if 'Sorry' in yda_video['title']: # Try again, rummage through all videos
print("trying again")
uri = 'https://api.dailymotion.com/videos?owners=revmic&' \
'fields=id,title,description,embed_url,' \
'thumbnail_480_url,views_total'
videos = get_list(requests.get(uri))
print(videos)
for v in videos:
if v['title'].lower() == 'yesterday':
yda_video = v
return render_template('hopcam.html', yesterday=yda_video)
开发者ID:revmic,项目名称:hhfarms,代码行数:28,代码来源:app.py
示例15: query
def query(query_term, folder_name, path):
BASE_URL = 'https://ajax.googleapis.com/ajax/services/search/images?' + 'v=1.0&q=' + query_term + '&start=%d'
BASE_PATH = os.path.join(path, folder_name.replace(' ', '_'))
if not os.path.exists(BASE_PATH):
os.makedirs(BASE_PATH)
print "made: " + BASE_PATH
start = 0 # start query string parameter for pagination
while start < 40: # query 20 pages
r = requests.get(BASE_URL % start)
for image_info in json.loads(r.text)['responseData']['results']:
url = image_info['unescapedUrl']
try:
image_r = requests.get(url)
except ConnectionError, e:
print 'could not download %s' % url
continue
#remove file-system path characters from name
title = query_term.replace(' ', '_') + '_' + image_info['imageId']
file = open(os.path.join(BASE_PATH, '%s.jpg') % title, 'w')
try:
Image.open(StringIO(image_r.content)).save(file, 'JPEG')
except IOError, e:
# throw away gifs and stuff
print 'couldnt save %s' % url
continue
finally:
开发者ID:isaacchansky,项目名称:image-classification,代码行数:31,代码来源:getGoogleImgs.py
示例16: downloadXkcd
def downloadXkcd(startComic, endComic):
for urlNumber in range(startComic, endComic):
# download the page
print('Downloading page http://xkcd.com/%s...' % (urlNumber))
res = requests.get('http://xkcd.com/%s' % (urlNumber))
res.raise_for_status()
soup = bs4.BeautifulSoup(res.text)
# find the url of the comic image
comicElem = soup.select('#comic img')
if comicElem == []:
print('Could not find comic image.')
else:
comicUrl = comicElem[0].get('src')
# download the image
print('Downloading image %s...' % (comicUrl))
res = requests.get(comicUrl)
res.raise_for_status()
# sav img to ./xkcd
imageFile = open(os.path.join('xkcd', os.path.basename(comicUrl)), 'wb')
for chunk in res.iter_content(1000000):
imageFile.write(chunk)
imageFile.close()
开发者ID:cssidy,项目名称:python-exercises,代码行数:25,代码来源:multiDownloadXkcd.py
示例17: test_small_layer
def test_small_layer(svc_url, svc_data):
"""
Test a service endpoint to see if the layer is small based on some simple rules.
:param svc_url: The URL pointing to the feature endpoint
:type svc_url: str
:param svc_data: A dictionary containing scraped data from an ESRI feature service endpoint
:type svc_data: dict
:returns: bool -- True if the layer is considered 'small'
"""
# FIXME needs refactoring, better error handling and better logic
global _proxies
try:
if svc_data['geometryType'] in ('esriGeometryPoint', 'esriGeometryMultipoint', 'esriGeometryEnvelope'):
count_query = '/query?where=1%3D1&returnCountOnly=true&f=pjson'
id_query = '/query?where=1%3D1&returnIdsOnly=true&f=json'
r = requests.get(get_base_url(svc_url) + count_query, proxies=_proxies)
if 'count' in r.json():
return r.json()['count'] <= 2000
r = requests.get(get_base_url(svc_url) + id_query, proxies=_proxies)
if 'objectIds' in r.json():
return len(r.json()['objectIds']) <= 2000
except:
pass
return False
开发者ID:dan-bowerman,项目名称:rcs,代码行数:25,代码来源:esri.py
示例18: search
def search():
query = raw_input('Search: ').replace (" ", "+")
url = base_url_search+snip+"&q="+query+"&type=video"+key
content = json.loads(requests.get(url).text)
stuff=[]
stuff = gets_video_id(content)
num=0
channelTitle = content['items'][0]['snippet']['channelTitle'].capitalize()
num_results=float(int(content['pageInfo']['totalResults']))
while content['nextPageToken'] and num<5:
next_page=content['nextPageToken'].encode('UTF8')
content=''
url = base_url_search+snip+"&q="+query+"&type=video&pageToken="+next_page+key
content = json.loads(requests.get(url).text)
num+=1
for videos in content['items']:
if videos['id']['kind']=='youtube#video':
vid_ids=videos['id']['videoId']
stuff.append(vid_ids)
stuff = [x.encode('UTF8') for x in stuff]
chunks=[stuff[i:i+50] for i in range(0, len(stuff), 50)]
return chunks, stuff, channelTitle, num_results
开发者ID:patricknelli,项目名称:youtube_stats,代码行数:29,代码来源:main.py
示例19: get_opendata2_courses
def get_opendata2_courses():
good_courses = 0
file_name = os.path.join(os.path.realpath(os.path.dirname(__file__)),
'%s/opendata2_departments.json' % c.DEPARTMENTS_DATA_DIR)
with open(file_name) as departments_file:
departments = json.load(departments_file)
# Create a text file for every department
for d in departments:
department = d['subject']
open_data_json = requests.get(
'https://api.uwaterloo.ca/v2/courses/{0}.json?key={1}'.format(
department.upper(), s.OPEN_DATA_API_KEY)).json
open_data_catalog_numbers = []
for course in open_data_json['data']:
open_data_catalog_numbers.append(course['catalog_number'])
# We now poll the individual endpoints of each course for the data
current_dep_json = []
course_url = 'https://api.uwaterloo.ca/v2/courses/{0}/{1}.json?key={2}'
for course in open_data_catalog_numbers:
good_courses += 1
json_data = requests.get(course_url.format(department.upper(),
course, s.OPEN_DATA_API_KEY)).json
current_dep_json.append(json_data['data'])
out_file_name = os.path.join(
os.path.realpath(os.path.dirname(__file__)),
'opendata2_courses/%s.json' % department.lower())
with open(out_file_name, 'w') as courses_out:
json.dump(current_dep_json, courses_out)
print 'Found {num} good courses'.format(num=good_courses)
开发者ID:GautamGupta,项目名称:rmc,代码行数:35,代码来源:crawler.py
示例20: get_posts_data
def get_posts_data(
self, blog, id=None, get_comments=False, *args, **options
):
if self.blog_to_migrate == "just_testing":
with open('test-data-comments.json') as test_json:
return json.load(test_json)
self.url = blog
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
if self.username and self.password:
auth = b64encode(
str.encode('{}:{}'.format(self.username, self.password)))
headers['Authorization'] = 'Basic {}'.format(auth)
if self.url.startswith('http://'):
base_url = self.url
else:
base_url = ''.join(('http://', self.url))
posts_url = ''.join((base_url, '/wp-json/posts'))
comments_url = ''.join((posts_url, '/%s/comments')) % id
if get_comments is True:
comments_url = ''.join((posts_url, '/%s/comments')) % id
fetched_comments = requests.get(comments_url)
comments_data = fetched_comments.text
comments_data = self.clean_data(comments_data)
return json.loads(comments_data)
else:
fetched_posts = requests.get(posts_url, headers=headers)
data = fetched_posts.text
data = self.clean_data(data)
return json.loads(data)
开发者ID:tomdyson,项目名称:wagtail_blog,代码行数:33,代码来源:wordpress_to_wagtail.py
注:本文中的requests.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论