本文整理汇总了Python中responses.add函数的典型用法代码示例。如果您正苦于以下问题:Python add函数的具体用法?Python add怎么用?Python add使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_add_feature
def test_add_feature(self):
responses.add(
method="POST",
url="http://bipolar.test/api/v1/feature/",
body='{"boolean_permission": true, "creation": "2014-05-23T08:54:42.077131", "id": 10, "limit_permission": null, "name": "data.business", "permission_level": "qualifier", "permission_type": "boolean", "resource_uri": "/api/v1/feature/10/"}'
)
resp = self.client.add_feature(
name="data.business",
permission_type='boolean',
boolean_permission=True,
limit_permission=None,
permission_level='qualifier',
)
expected = {
'name': 'data.business',
'permission_type': 'boolean',
'boolean_permission': True,
'limit_permission': None,
'permission_level': 'qualifier',
}
self.assertTrue("id" in resp.keys())
self.assertTrue("resource_uri" in resp.keys())
self.assertTrue("creation" in resp.keys())
for k, v in expected.items():
self.assertEqual((k, resp[k]), (k, v))
开发者ID:marinho,项目名称:bipolar-python-client,代码行数:26,代码来源:unit.py
示例2: test_content
def test_content(self):
responses.add(responses.GET, 'https://api.github.com/gists/1',
body=json.dumps({
"files": {
"file-A.txt": {
"filename": "file-A.txt",
"content": b64encode("test-content-A"),
},
"file-B.txt": {
"filename": "file-B.txt",
"content": b64encode("test-content-\u212C"),
}
},
"description": "test-gist",
"public": True,
"id": 1,
}),
status=200,
)
content = gist.GistAPI(token='foo').content('1')
self.assertEqual(len(content), 2)
self.assertTrue('file-A.txt' in content)
self.assertTrue('file-B.txt' in content)
self.assertEqual(content['file-A.txt'], 'test-content-A')
self.assertEqual(content['file-B.txt'], 'test-content-\u212C')
开发者ID:cc7x0mohTA,项目名称:gist,代码行数:27,代码来源:test_gist.py
示例3: test_update_topology_command
def test_update_topology_command(self):
t = Topology.objects.first()
t.parser = 'netdiff.NetJsonParser'
t.save()
responses.add(responses.GET,
'http://127.0.0.1:9090',
body=self._load('static/netjson-1-link.json'),
content_type='application/json')
Node.objects.all().delete()
update_topology()
self.assertEqual(Node.objects.count(), 2)
self.assertEqual(Link.objects.count(), 1)
# test exception
t.url = t.url.replace('9090', '9091')
t.save()
Node.objects.all().delete()
Link.objects.all().delete()
responses.add(responses.GET,
'http://127.0.0.1:9091',
body=self._load('static/netjson-invalid.json'),
content_type='application/json')
# capture output
output = StringIO()
with redirect_stdout(output):
call_command('update_topology')
self.assertEqual(Node.objects.count(), 1)
self.assertEqual(Link.objects.count(), 0)
self.assertIn('Failed to', output.getvalue())
开发者ID:barseghyanartur,项目名称:django-netjsongraph,代码行数:29,代码来源:test_topology.py
示例4: test_login_url
def test_login_url():
responses.add(
responses.POST,
"https://example.com/oauth/request_token",
body="oauth_token=foobar&oauth_token_secret=bazqux",
)
app, _ = make_app()
client = app.test_client()
resp = client.get(
"/login/test-service", base_url="https://a.b.c", follow_redirects=False
)
# check that we obtained a request token
assert len(responses.calls) == 1
assert "Authorization" in responses.calls[0].request.headers
auth_header = dict(
parse_authorization_header(
responses.calls[0].request.headers["Authorization"].decode("utf-8")
)
)
assert auth_header["oauth_consumer_key"] == "client_key"
assert "oauth_signature" in auth_header
assert auth_header["oauth_callback"] == quote_plus(
"https://a.b.c/login/test-service/authorized"
)
# check that we redirected the client
assert resp.status_code == 302
assert (
resp.headers["Location"]
== "https://example.com/oauth/authorize?oauth_token=foobar"
)
开发者ID:singingwolfboy,项目名称:flask-dance,代码行数:30,代码来源:test_oauth1.py
示例5: test_authorized_url
def test_authorized_url():
responses.add(
responses.POST,
"https://example.com/oauth/access_token",
body="oauth_token=xxx&oauth_token_secret=yyy",
)
app, _ = make_app()
with app.test_client() as client:
resp = client.get(
"/login/test-service/authorized?oauth_token=foobar&oauth_verifier=xyz",
base_url="https://a.b.c",
)
# check that we redirected the client
assert resp.status_code == 302
assert resp.headers["Location"] == "https://a.b.c/"
# check that we obtained an access token
assert len(responses.calls) == 1
assert "Authorization" in responses.calls[0].request.headers
auth_header = dict(
parse_authorization_header(
responses.calls[0].request.headers["Authorization"].decode("utf-8")
)
)
assert auth_header["oauth_consumer_key"] == "client_key"
assert auth_header["oauth_token"] == "foobar"
assert auth_header["oauth_verifier"] == "xyz"
# check that we stored the access token and secret in the session
assert flask.session["test-service_oauth_token"] == {
"oauth_token": "xxx",
"oauth_token_secret": "yyy",
}
开发者ID:singingwolfboy,项目名称:flask-dance,代码行数:31,代码来源:test_oauth1.py
示例6: test_signal_oauth_error_login
def test_signal_oauth_error_login(request):
responses.add(
responses.POST,
"https://example.com/oauth/request_token",
body="oauth_problem=nonce_used",
status=401,
)
app, bp = make_app()
calls = []
def callback(*args, **kwargs):
calls.append((args, kwargs))
oauth_error.connect(callback)
request.addfinalizer(lambda: oauth_error.disconnect(callback))
with app.test_client() as client:
resp = client.get("/login/test-service", base_url="https://a.b.c")
assert len(calls) == 1
assert calls[0][0] == (bp,)
assert (
calls[0][1]["message"]
== "Token request failed with code 401, response was 'oauth_problem=nonce_used'."
)
assert resp.status_code == 302
location = resp.headers["Location"]
assert location == "https://a.b.c/"
开发者ID:singingwolfboy,项目名称:flask-dance,代码行数:29,代码来源:test_oauth1.py
示例7: assert_url_correct
def assert_url_correct(self, method, url_, request, *args, **kwargs):
# Mock the request.
responses.add(method, url_)
# Make the request. ApiErrors are ok.
try:
request(*args, **kwargs)
except errors.ApiError:
pass
# Split the params from the URL.
parts = responses.calls[0].request.url.split('?', 1)
if len(parts) == 1:
requested_url, param_str = parts[0], None
else:
requested_url, param_str = parts
# Check the URL.
self.assertEqual(requested_url, url_)
# Check the params. Allow for the API token to be found.
if param_str is None:
return
kvparams = map(lambda s: s.split('='), param_str.split('&'))
for k, v in filter(lambda i: i[0] != 'token', kvparams):
self.assertIn(k, kwargs)
self.assertEqual(str(kwargs[k]), urllib.parse.unquote(v))
开发者ID:fraben,项目名称:Groupy,代码行数:27,代码来源:testendpoint.py
示例8: mock_submit_job_and_poll_result
def mock_submit_job_and_poll_result(
job,
job_state,
result=None,
error=None
):
submit_request_mock_uri = base_uri + "/sessions/" + str(session_id) \
+ "/submit-job"
poll_request_mock_uri = base_uri + "/sessions/" + str(session_id) \
+ "/jobs/" + str(job_id)
post_json_data = {
u'state': u'SENT', u'error': None, u'id': job_id, u'result': None
}
responses.add(responses.POST, submit_request_mock_uri, status=201,
json=post_json_data, content_type='application/json')
get_json_data = {
u'state': job_state, u'error': error, u'id': job_id, u'result': result
}
responses.add(responses.GET, poll_request_mock_uri, status=200,
json=get_json_data, content_type='application/json')
submit_job_future = client_test.submit(job)
return submit_job_future
开发者ID:cloudera,项目名称:livy,代码行数:25,代码来源:client_test.py
示例9: test_ReleaseNews
def test_ReleaseNews(mock_send_msg,
versions_json,
versions_json_content,
heise_html):
url = 'http://www.heise.de/download/firefox.html'
responses.add(responses.GET,
url,
body=heise_html,
status=200)
release_news = ReleaseNews(versions_file=versions_json_content)
assert release_news.version_dict == {
u'jre': u'Java Runtime Environment (JRE) 8u25',
u'firefox': u'Firefox Setup 34.0.5.exe',
u'acrobat_reader': u'11.0.10'}
release_news = ReleaseNews(versions_file=versions_json)
assert release_news.version_dict == {}
@release_news.check_this
def firefox():
url = 'http://www.heise.de/download/firefox.html'
return return_check_this(get_version_from_heise(url), url)
release_news.checker('[email protected]', 'test', '[email protected]')
assert release_news.version_dict == {'firefox': u'Firefox 34.0.5'}
开发者ID:xsteadfastx,项目名称:release_news,代码行数:29,代码来源:test_main.py
示例10: test_insecure_flag
def test_insecure_flag(self):
# Given
responses.add(responses.GET,
"https://acme.com/accounts/user/info/",
body=json.dumps(R_JSON_AUTH_RESP))
config = Configuration()
config.update(store_url="https://acme.com")
config.update(auth=("nono", "le gros robot"))
# When
with self.assertRaises(SystemExit) as e:
with mock.patch("enstaller.main._ensure_config_or_die",
return_value=config):
with mock.patch(
"enstaller.main.ensure_authenticated_config"
):
main_noexc(["-s", "fubar"])
# Then
self.assertEqual(e.exception.code, 0)
# When
with self.assertRaises(SystemExit) as e:
with mock.patch("enstaller.main._ensure_config_or_die",
return_value=config):
with mock.patch(
"enstaller.main.ensure_authenticated_config"
):
main_noexc(["-ks", "fubar"])
# Then
self.assertEqual(e.exception.code, 0)
开发者ID:enthought,项目名称:enstaller,代码行数:33,代码来源:test_misc.py
示例11: test_text_playlist_with_mpeg_stream
def test_text_playlist_with_mpeg_stream(
self, scanner, provider, caplog):
scanner.scan.side_effect = [
# Scanning playlist
mock.Mock(mime='text/foo', playable=False),
# Scanning stream
mock.Mock(mime='audio/mpeg', playable=True),
]
responses.add(
responses.GET, PLAYLIST_URI,
body=BODY, content_type='audio/x-mpegurl')
result = provider.translate_uri(PLAYLIST_URI)
assert scanner.scan.mock_calls == [
mock.call(PLAYLIST_URI, timeout=mock.ANY),
mock.call(STREAM_URI, timeout=mock.ANY),
]
assert result == STREAM_URI
# Check logging to ensure debuggability
assert 'Unwrapping stream from URI: %s' % PLAYLIST_URI
assert 'Parsed playlist (%s)' % PLAYLIST_URI in caplog.text()
assert 'Unwrapping stream from URI: %s' % STREAM_URI
assert (
'Unwrapped potential audio/mpeg stream: %s' % STREAM_URI
in caplog.text())
# Check proper Requests session setup
assert responses.calls[0].request.headers['User-Agent'].startswith(
'Mopidy-Stream/')
开发者ID:AddBassStudios,项目名称:mopidy,代码行数:32,代码来源:test_playback.py
示例12: test_after_link_issue
def test_after_link_issue(self):
issue_id = 3
repo = 'myaccount/myrepo'
comment = {'comment': 'hello I\'m a comment'}
responses.add(
responses.POST,
u'https://api.bitbucket.org/2.0/repositories/{repo}/issues/{issue_id}/comments'.format(
repo=repo, issue_id=issue_id),
status=201,
json={'content': {'raw': comment}},
)
external_issue = ExternalIssue.objects.create(
organization_id=self.organization.id,
integration_id=self.integration.id,
key='%s#%d' % (repo, issue_id),
)
self.integration.get_installation(
external_issue.organization_id).after_link_issue(
external_issue, data=comment)
request = responses.calls[0].request
assert responses.calls[0].response.status_code == 201
payload = json.loads(request.body)
assert payload == {'content': {'raw': comment['comment']}}
开发者ID:mjumbewu,项目名称:sentry,代码行数:26,代码来源:test_issues.py
示例13: test_upload
def test_upload():
responses.add(responses.POST, upload.PYPI, status=200)
with patch('flit.upload.get_repository', return_value=repo_settings):
wheel.wheel_main(samples_dir / 'module1-pkg.ini', upload='pypi')
assert len(responses.calls) == 1
开发者ID:adamchainz,项目名称:flit,代码行数:7,代码来源:test_upload.py
示例14: run
def run():
responses.add(
responses.GET, 'http://example.com/?test=1',
match_querystring=True)
with pytest.raises(ConnectionError):
requests.get('http://example.com/foo/?test=2')
开发者ID:jboning,项目名称:responses,代码行数:7,代码来源:test_responses.py
示例15: mock_file_apis
def mock_file_apis(job_command, job_func, job_func_arg):
request_uri = base_uri + "/sessions/" + str(session_id) + \
"/" + job_command
responses.add(responses.POST, request_uri, status=201, body='',
content_type='application/json')
test_file_api_future = job_func(job_func_arg)
return test_file_api_future
开发者ID:cloudera,项目名称:livy,代码行数:7,代码来源:client_test.py
示例16: test_assignee_search
def test_assignee_search(self):
responses.add(
responses.GET,
'https://example.atlassian.net/rest/api/2/project',
json=[{'key': 'HSP', 'id': '10000'}],
match_querystring=False
)
def responder(request):
query = parse_qs(urlparse(request.url).query)
assert 'HSP' == query['project'][0]
assert 'bob' == query['query'][0]
return (200, {}, SAMPLE_USER_SEARCH_RESPONSE)
responses.add_callback(
responses.GET,
'https://example.atlassian.net/rest/api/2/user/assignable/search',
callback=responder,
content_type='json',
match_querystring=False
)
org = self.organization
self.login_as(self.user)
path = reverse('sentry-extensions-jira-search', args=[org.slug, self.integration.id])
resp = self.client.get('%s?project=10000&field=assignee&query=bob' % (path,))
assert resp.status_code == 200
assert resp.data == [
{'value': 'deadbeef123', 'label': 'Bobby - [email protected]'}
]
开发者ID:yaoqi,项目名称:sentry,代码行数:31,代码来源:test_search_endpoint.py
示例17: test_get_anime
def test_get_anime(self):
"""
Tests the flask 'Controller' get_anime where it gets an anime
and decides whether to fecth or update its subgroups from anidb
"""
subs = [Sub_Group(gid="1234",name="poo subs",state="done",lastep="8",eprange="1-3", audio="japanese",subs="english")]
expected = Anime(aid="1234",eng_title="english",jpn_title="japan",eng_jpn="jpneng")
#set fake response from anidb client
responses.add(responses.GET,
url='http://anidb.net/perl-bin/animedb.pl?show=anime&aid='+str(expected.aid)+"&showallag=1#grouplist",
body=Data.anime_page,match_querystring=True,status=200)
# handle db setup bad practice to commit
self.db.session.add(expected)
#dao = DAO(self.db.session)
# dao.get_anime(aid="1234")
self.db.session.commit()
self.assertNotEqual(expected.id,None)
print "anime pk: "+str(expected.id)
response = server.get_anime(expected.id)
print str(response)
self.assertEqual(response[1],200)
#bug subgroup foreign key is a string needs to be integer
self.db.session.close()
开发者ID:adeptues,项目名称:weaboo,代码行数:25,代码来源:tests.py
示例18: test_redirect_fallback
def test_redirect_fallback():
responses.add(
responses.POST,
"https://example.com/oauth/access_token",
body="oauth_token=xxx&oauth_token_secret=yyy",
)
blueprint = OAuth1ConsumerBlueprint(
"test-service",
__name__,
client_key="client_key",
client_secret="client_secret",
base_url="https://example.com",
request_token_url="https://example.com/oauth/request_token",
access_token_url="https://example.com/oauth/access_token",
authorization_url="https://example.com/oauth/authorize",
)
app = flask.Flask(__name__)
app.secret_key = "secret"
app.register_blueprint(blueprint, url_prefix="/login")
with app.test_client() as client:
resp = client.get(
"/login/test-service/authorized?oauth_token=foobar&oauth_verifier=xyz",
base_url="https://a.b.c",
)
# check that we redirected the client
assert resp.status_code == 302
assert resp.headers["Location"] == "https://a.b.c/"
开发者ID:singingwolfboy,项目名称:flask-dance,代码行数:28,代码来源:test_oauth1.py
示例19: testLimitsViaHeadersNoSleep
def testLimitsViaHeadersNoSleep(self):
api = twitter.Api(
consumer_key='test',
consumer_secret='test',
access_token_key='test',
access_token_secret='test',
sleep_on_rate_limit=False)
# Get initial rate limit data to populate api.rate_limit object
with open('testdata/ratelimit.json') as f:
resp_data = f.read()
url = '%s/application/rate_limit_status.json' % self.api.base_url
responses.add(responses.GET, url, body=resp_data, match_querystring=True)
# Add a test URL just to have headers present
responses.add(
method=responses.GET, url=DEFAULT_URL, body='{}', adding_headers=HEADERS)
resp = api.GetSearch(term='test')
self.assertTrue(api.rate_limit.__dict__)
self.assertEqual(api.rate_limit.get_limit('/search/tweets').limit, 63)
self.assertEqual(api.rate_limit.get_limit('/search/tweets').remaining, 63)
self.assertEqual(api.rate_limit.get_limit('/search/tweets').reset, 626672700)
# No other resource families should be set during above operation.
test_url = '/lists/subscribers/show.json'
self.assertEqual(
api.rate_limit.__dict__.get('resources').get(test_url), None
)
# But if we check them, it should go to default 15/15
self.assertEqual(api.rate_limit.get_limit(test_url).remaining, 15)
self.assertEqual(api.rate_limit.get_limit(test_url).limit, 15)
开发者ID:bear,项目名称:python-twitter,代码行数:32,代码来源:test_rate_limit.py
示例20: test_signal_oauth_error_authorized
def test_signal_oauth_error_authorized(request):
responses.add(
responses.POST,
"https://example.com/oauth/access_token",
body="Invalid request token.",
status=401,
)
app, bp = make_app()
calls = []
def callback(*args, **kwargs):
calls.append((args, kwargs))
oauth_error.connect(callback)
request.addfinalizer(lambda: oauth_error.disconnect(callback))
with app.test_client() as client:
resp = client.get(
"/login/test-service/authorized?"
"oauth_token=faketoken&"
"oauth_token_secret=fakesecret&"
"oauth_verifier=fakeverifier",
base_url="https://a.b.c",
)
assert len(calls) == 1
assert calls[0][0] == (bp,)
assert (
calls[0][1]["message"]
== "Token request failed with code 401, response was 'Invalid request token.'."
)
assert resp.status_code == 302
开发者ID:singingwolfboy,项目名称:flask-dance,代码行数:33,代码来源:test_oauth1.py
注:本文中的responses.add函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论