本文整理汇总了Python中requests.delete函数的典型用法代码示例。如果您正苦于以下问题:Python delete函数的具体用法?Python delete怎么用?Python delete使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了delete函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: remove_units
def remove_units(self, units, app_name):
requests.delete(
"{0}/apps/{1}/units".format(settings.TSURU_HOST,
app_name),
headers=self.authorization,
data=str(units)
)
开发者ID:rpeterson,项目名称:tsuru-dashboard,代码行数:7,代码来源:views.py
示例2: tearDown
def tearDown(self):
"""
Delete the server
"""
headers = {'Content-Type': 'application/json; charset=utf-8'}
response = requests.get(__db_url__)
value = response.json()
if value:
db_length = len(value['databases'])
last_db_id = value['databases'][db_length-1]['id']
url = 'http://%s:8000/api/1.0/databases/%u/servers/' % \
(__host_or_ip__,last_db_id)
response = requests.get(url)
value = response.json()
if value:
server_length = len(value['members'])
last_server_id = value['members'][server_length-1]['id']
print "ServerId to be deleted is " + str(last_server_id)
url += str(last_server_id)
response = requests.delete(url)
self.assertEqual(response.status_code, 200)
# Delete database
db_url = __db_url__ + str(last_db_id)
response = requests.delete(db_url)
self.assertEqual(response.status_code, 200)
else:
print "The Server list is empty"
else:
print "The database list is empty"
开发者ID:jango2015,项目名称:voltdb,代码行数:29,代码来源:start_stop_database_test.py
示例3: main
def main(argv):
try:
opts, args = getopt.getopt(argv,"hd:g:",["-d"])
except getopt.GetoptError:
print 'No option for disks added, using default of 3 Disks'
for opt, arg in opts:
if opt == '-d':
print arg + ' disks entered'
numberOfDisks = int(arg)
#run our prebuild
pegs = prebuild(numberOfDisks)
#print our starting disks
prerunpegsize = requests.get(createStackUrl + '/' + str(pegs[0]) + '/size'
, auth=(username,password))
preruntopdisk = requests.get(createStackUrl + '/' + str(pegs[0]) + '/peek'
, auth=(username,password))
#run hanoi
print 'stack ' + str(pegs[0]) + ' has ' + prerunpegsize.text \
+ ' disks with the disk ' + preruntopdisk.text + ' on top'
hanoi(numberOfDisks, pegs[0], pegs[1], pegs[2])
#print our final disks
prerunpegsize = requests.get(createStackUrl + '/' + str(pegs[2]) + '/size'
, auth=(username,password))
preruntopdisk = requests.get(createStackUrl + '/' + str(pegs[2]) + '/peek'
, auth=(username,password))
#run hanoi
print 'stack ' + str(pegs[2]) + ' has ' + prerunpegsize.text \
+ ' disks with the disk ' + preruntopdisk.text + ' on top'
#do cleanups
for i in range(len(pegs)):
requests.delete(createStackUrl + '/' + str(pegs[i]) + '/clear'
, auth=(username,password))
开发者ID:TheBoredMage,项目名称:Demos,代码行数:34,代码来源:run_towers.py
示例4: tearDownModule
def tearDownModule():
# remove index created
INI_CONFIG = IniParser().read_ini(MY_PUB_INI_FILE)
requests.delete(ElasticSettings.url() + '/' + INI_CONFIG['DISEASE']['index'])
os.remove(MY_PUB_INI_FILE)
if os.path.exists(TEST_DATA_DIR + '/STAGE'):
shutil.rmtree(TEST_DATA_DIR + '/STAGE')
开发者ID:D-I-L,项目名称:django-data-pipeline,代码行数:7,代码来源:tests_download.py
示例5: absent
def absent(name, profile='grafana'):
'''
Ensure that a data source is present.
name
Name of the data source to remove.
'''
if isinstance(profile, string_types):
profile = __salt__['config.option'](profile)
ret = {'result': None, 'comment': None, 'changes': None}
datasource = _get_datasource(profile, name)
if not datasource:
ret['result'] = True
ret['comment'] = 'Data source {0} already absent'.format(name)
return ret
requests.delete(
_get_url(profile, datasource['id']),
headers=_get_headers(profile),
timeout=profile.get('grafana_timeout', 3),
)
ret['result'] = True
ret['comment'] = 'Data source {0} was deleted'.format(name)
return ret
开发者ID:HowardMei,项目名称:saltstack,代码行数:28,代码来源:grafana_datasource.py
示例6: test_timed_cycle
def test_timed_cycle(self):
return # basically a debug test for thread pool bit
target = 'http://requestbin.zapier.com/api/v1/bin/test_timed_cycle'
hooks = [self.make_hook(event, target) for event in ['comment.added', 'comment.changed', 'comment.removed']]
for n in range(4):
early = datetime.now()
# fires N * 3 http calls
for x in range(10):
comment = Comment.objects.create(
site=self.site,
content_object=self.user,
user=self.user,
comment='Hello world!'
)
comment.comment = 'Goodbye world...'
comment.save()
comment.delete()
total = datetime.now() - early
print(total)
while True:
response = requests.get(target + '/view')
sent = response.json
if sent:
print(len(sent), models.async_requests.total_sent)
if models.async_requests.total_sent >= (30 * (n+1)):
time.sleep(5)
break
time.sleep(1)
requests.delete(target + '/view') # cleanup to be polite
开发者ID:TexasFile,项目名称:django-rest-hooks,代码行数:34,代码来源:tests.py
示例7: publish
def publish(bulk, endpoint, rebuild, mapping):
# if configured to rebuild_index
# Delete and then re-create to dataType index (via PUT request)
index_url = endpoint + "/dco"
if rebuild:
requests.delete(index_url)
r = requests.put(index_url)
if r.status_code != requests.codes.ok:
print(r.url, r.status_code)
r.raise_for_status()
# push current dataType document mapping
mapping_url = endpoint + "/dco/datatype/_mapping"
with open(mapping) as mapping_file:
r = requests.put(mapping_url, data=mapping_file)
if r.status_code != requests.codes.ok:
# new mapping may be incompatible with previous
# delete current mapping and re-push
requests.delete(mapping_url)
r = requests.put(mapping_url, data=mapping_file)
if r.status_code != requests.codes.ok:
print(r.url, r.status_code)
r.raise_for_status()
# bulk import new dataType documents
bulk_import_url = endpoint + "/_bulk"
r = requests.post(bulk_import_url, data=bulk)
if r.status_code != requests.codes.ok:
print(r.url, r.status_code)
r.raise_for_status()
开发者ID:tetherless-world,项目名称:dco-elasticsearch,代码行数:35,代码来源:ingest-datatypes.py
示例8: rebuild_index
def rebuild_index(self):
# check that ElasticSearch is awake
self.check_index()
print "Deleting type: %s" % (Config.elasticsearch['type_record'])
r = requests.delete(Config.elasticsearch['uri_records'])
print "...response from: %s (%s)" % (r.url, r.status_code)
print "Deleting type: %s" % (Config.elasticsearch['type_config'])
r = requests.delete(Config.elasticsearch['uri_configs'])
print "...response from: %s (%s)" % (r.url, r.status_code)
# check to see if index exists - in which case it will have a mapping even if it is empty, create if not
if requests.get(Config.elasticsearch['uri_index'] + '/_mapping').status_code == 404:
print "Creating index: %s" % (Config.elasticsearch['index'])
r = requests.post(Config.elasticsearch['uri_index'])
print "...response from: %s (%s)" % (r.url, r.status_code)
# check for mapping and create one if provided and does not already exist
# this will automatically create the necessary index type if it is not already there
if Config.elasticsearch['mapping']:
r = requests.get(Config.elasticsearch['uri_records'] + '_mapping')
if r.status_code == 404:
print "Creating mapping for type: %s" % (Config.elasticsearch['type_record'])
r = requests.put(Config.elasticsearch['uri_records'] + '_mapping', data=json.dumps(Config.elasticsearch['mapping']))
print "...response from: %s (%s)" % (r.url, r.status_code)
print "Creating mapping for type: %s" % (Config.elasticsearch['type_config'])
r = requests.put(Config.elasticsearch['uri_configs'] + '_mapping', data=json.dumps(Config.elasticsearch['mapping']))
print "...response from: %s (%s)" % (r.url, r.status_code)
else:
print "Warning: no elasticsearch mapping defined in Config.py."
开发者ID:HeinrichHartmann,项目名称:OpenCitationsCorpus,代码行数:32,代码来源:OpenCitationsImportLibrary.py
示例9: test_upload_and_download_node_GET_gzip
def test_upload_and_download_node_GET_gzip():
# download file in compressed format, works with all the above options
# curl -X GET http://<host>[:<port>]/node/<node_id>?download&compression=<zip|gzip>
# upload node
TESTURL = "{SHOCK_URL}/node".format(SHOCK_URL=SHOCK_URL)
TESTHEADERS = {"Authorization": "OAuth {}".format(TOKEN)}
FILES = {'upload': open(DATADIR + 'CCC.txt', 'rb')}
if DEBUG: print("POST", TESTURL, TESTHEADERS, FILES)
response = requests.post(TESTURL, headers=TESTHEADERS, files=FILES)
data = json.loads(response.content.decode("utf-8"))
NODEID = data["data"]["id"]
# test my node exists
TESTURL = SHOCK_URL + "/node/{}".format(NODEID)
TESTHEADERS = {"Authorization": "OAuth {}".format(TOKEN)}
FILES = {}
if DEBUG: print("GET", TESTURL, TESTHEADERS)
response = requests.get(TESTURL, headers=TESTHEADERS)
data = json.loads(response.content.decode("utf-8"))
assert data["status"] == 200
# Download node
DLURL = SHOCK_URL + "/node/{}?download&compression=gzip".format(NODEID)
response = requests.get(DLURL, headers=TESTHEADERS)
assert response.content[0:3] != b"CCC"
#cleanup
NODEURL = SHOCK_URL + "/node/{}".format(NODEID)
requests.delete(NODEURL, headers=TESTHEADERS)
开发者ID:wgerlach,项目名称:Shock,代码行数:26,代码来源:test_shock.py
示例10: delete_hooks
def delete_hooks(orgOrUser, repoOrRepos, hook_type=None):
hooks = list_hooks(orgOrUser, repoOrRepos, hook_type)
url_templ = 'https://api.github.com/repos/{owner}/{repo}/hooks/{hook}'
auth = HTTPBasicAuth(*get_auth())
for hook in hooks:
url = url_templ.format(owner=orgOrUser, **hook)
requests.delete(url, auth=auth)
开发者ID:cnuber,项目名称:gitbot,代码行数:7,代码来源:hooks.py
示例11: test_delete_server
def test_delete_server(self):
"""
server delete test
"""
response = requests.get(__db_url__)
value = response.json()
if value:
db_length = len(value['databases'])
last_db_id = value['databases'][db_length-1]['id']
url = 'http://%s:8000/api/1.0/databases/%u/servers/' % \
(__host_or_ip__,last_db_id)
response = requests.get(url)
value = response.json()
if value:
server_length = len(value['members'])
last_server_id = value['members'][server_length-1]['id']
print "ServerId to be deleted is " + str(last_server_id)
url = 'http://%s:8000/api/1.0/databases/%u/servers/' % \
(__host_or_ip__,last_db_id)
url += str(last_server_id)
response = requests.delete(url)
if response.status_code == 403:
print value['statusstring']
self.assertEqual(value['statusstring'], 'Cannot delete a running server')
else:
self.assertEqual(response.status_code, 204)
db_url = __db_url__ + str(last_db_id)
response = requests.delete(db_url)
self.assertEqual(response.status_code, 204)
else:
print "The Server list is empty"
开发者ID:jianlirong,项目名称:voltdb,代码行数:32,代码来源:serverTest.py
示例12: unsubscribe
def unsubscribe(listname, address):
url = get_url(listname)
try:
requests.delete(url, timeout=TIMEOUT, data={'address': address})
except requests.exceptions.RequestException:
return False
return True
开发者ID:alexandreab,项目名称:colab,代码行数:7,代码来源:mailman.py
示例13: test_job_states
def test_job_states(hostname, large_file):
"""
Test that after requesting an action, the job will update and eventually wind up in status "done"
:param hostname: The hostname under test (this fixture is automatically injected by pytest)
:param large_file: A large-ish filename (this fixture is automatically injected by pytest)
"""
with open(large_file, 'r') as f:
resp = requests.post(hostname + '/images',
data={'user_id': 'test-user-{}'.format(uuid.uuid4())},
files={'file': ('bridge.jpeg', f)})
img_id = resp.json()['id']
# Send a resize request and use the job_id to check for current state
resp = requests.put(hostname + '/image/{}'.format(img_id), data={'action': 'resize', 'size': '50,50'})
job_id1 = resp.json()['job_id']
# Send a crop request and use the job_id to check for current state
resp = requests.put(hostname + '/image/{}'.format(img_id), data={'action': 'crop', 'box': '50,50,100,100'})
job_id2 = resp.json()['job_id']
# Send a transcode request and use the job_id to check for current state
resp = requests.put(hostname + '/image/{}'.format(img_id), data={'action': 'transcode', 'extension': 'bmp'})
job_id3 = resp.json()['job_id']
wait_for_job_done(hostname + '/job/{}'.format(job_id1))
wait_for_job_done(hostname + '/job/{}'.format(job_id2))
wait_for_job_done(hostname + '/job/{}'.format(job_id3))
# Clean up test data and delete the image
requests.delete(hostname + '/image/{}'.format(img_id))
开发者ID:justiniso,项目名称:jw-performance,代码行数:32,代码来源:test_jobs.py
示例14: update_tracker
def update_tracker(session_token, download_id, tracker):
announce_url = tracker['announce']
parts = list(urlparse(announce_url))
parts[1] = NEW_TRACKER_HOST
new_announce = urlunparse(parts)
print("> UPDATE tracker %s ==> %s" % (announce_url, new_announce))
# add new tracker
url = MAFREEBOX_API_URL + ("downloads/%d/trackers" % download_id)
rep = requests.post(url, json={
'announce': new_announce,
'is_enabled': True
}, headers={
'X-Fbx-App-Auth': session_token
})
get_api_result(rep)
# remove prev tracker
url = MAFREEBOX_API_URL + ("downloads/%d/trackers/%s" % (download_id, quote(announce_url, safe='')))
rep = requests.delete(url, headers={
'X-Fbx-App-Auth': session_token
})
get_api_result(rep)
# active new tracker
url = MAFREEBOX_API_URL + ("downloads/%d/trackers/%s" % (download_id, quote(new_announce, safe='')))
rep = requests.delete(url, json={
'is_enabled': True
}, headers={
'X-Fbx-App-Auth': session_token
})
get_api_result(rep)
开发者ID:r0ro,项目名称:t411-fbx-tracker-update,代码行数:31,代码来源:update_tracker.py
示例15: check_local
def check_local():
global yang_models_url
body = request.json
if body['repository']['owner_name'] == 'yang-catalog':
if body['result_message'] == 'Passed':
if body['type'] == 'push':
# After build was successful only locally
json_body = jsonify({
"title": "Cron job - every day pull of ietf draft yang files.",
"body": "ietf extracted yang modules",
"head": "yang-catalog:master",
"base": "master"
})
requests.post(yang_models_url + '/pulls', json=json_body,
headers={'Authorization': 'token ' + token})
if body['type'] == 'pull_request':
# If build was successful on pull request
pull_number = body['pull_request_number']
log.write('pull request was successful %s' % repr(pull_number))
#requests.put('https://api.github.com/repos/YangModels/yang/pulls/' + pull_number +
# '/merge', headers={'Authorization': 'token ' + token})
requests.delete(yang_models_url,
headers={'Authorization': 'token ' + token})
开发者ID:mjethanandani,项目名称:yang,代码行数:25,代码来源:api.py
示例16: acl_consul
def acl_consul(acl_consul_instance):
ACLConsul = collections.namedtuple('ACLConsul', ['port', 'token'])
port, token = acl_consul_instance
yield ACLConsul(port, token)
# remove all data from the instance, to have a clean start
base_uri = 'http://127.0.0.1:%s/v1/' % port
requests.delete(base_uri + 'kv/?recurse=1')
开发者ID:heartpandora,项目名称:python-consul,代码行数:7,代码来源:conftest.py
示例17: __init__
def __init__(self, url, index, mappings = None, clean = False):
''' clean: remove already existing index '''
self.url = url
# Valid index for elastic
self.index = self.safe_index(index)
self.index_url = self.url+"/"+self.index
self.max_items_bulk = 100
self.wait_bulk_seconds = 2 # time to wait to complete a bulk operation
try:
r = requests.get(self.index_url)
except requests.exceptions.ConnectionError:
raise ElasticConnectException()
if r.status_code != 200:
# Index does no exists
r = requests.post(self.index_url)
if r.status_code != 200:
logging.info("Can't create index %s (%s)" %
(self.index_url, r.status_code))
raise ElasticWriteException()
else:
logging.info("Created index " + self.index_url)
else:
if clean:
requests.delete(self.index_url)
requests.post(self.index_url)
logging.info("Deleted and created index " + self.index_url)
if mappings:
self.create_mappings(mappings)
开发者ID:pombredanne,项目名称:GrimoireELK,代码行数:31,代码来源:elastic.py
示例18: remove_user
def remove_user():
try:
data_json = request.get_json()
if not check_arguments(['login', 'code'], data_json):
raise Exception("Bad arguments")
login = data_json['login']
code = data_json['code']
if check_connect(login, code):
url = get_users_url("remove_login")
data = {'login': login}
headers = {'Content-type': 'application/json'}
result = requests.delete(url, data=json.dumps(data), headers=headers).json()
if 'error' in result:
raise Exception(result['error'])
url = get_users_url("remove_user")
result = requests.delete(url, data=json.dumps(data), headers=headers).json()
if 'error' in result:
raise Exception(result['error'])
url = get_session_url("logout?login={0}&code={1}".format(login, code))
result = requests.get(url)
return result.text
raise Exception("Access denied")
except Exception as e:
return make_response(str(e), 400, {'olol':'ololol'})
开发者ID:olyakosyura,项目名称:RSOI,代码行数:31,代码来源:logic.py
示例19: test_collection_user_authorized_documents
def test_collection_user_authorized_documents(suite, collection_level_user, docs):
jwt = _login(collection_level_user)
headers = _auth_header(jwt)
##
#
authorized_documents = 'http://localhost:5000/api/authorized-app/authorized-documents'
#
# delete all documents in a collection
confirmed_dangerous_delete = requests.delete(authorized_documents, headers=headers, params={'delete_all': True})
assert confirmed_dangerous_delete.ok
# delete all documents in a collection, set delete_all to false and throw an error
dangerous_delete = requests.delete(authorized_documents, headers=headers, params={'delete_all': False})
assert not dangerous_delete.ok
# add an item to the collection
post = requests.post(authorized_documents, headers=headers, data=json.dumps(docs))
assert post.ok
# get the first page of the docs added (one)
get_all = requests.get(authorized_documents + ';json', headers=headers)
assert get_all.ok
# delete the doc that we added
confirmed_dangerous_delete = requests.delete(authorized_documents, headers=headers, params={'delete_all': True})
assert confirmed_dangerous_delete.ok
_logout(jwt)
开发者ID:JeffHeard,项目名称:sondra,代码行数:30,代码来源:test_requests_auth.py
示例20: _un_publish
def _un_publish(self, token):
url = "http://%s:%s/service/%s" % (
self._server_ip, self._server_port, token)
requests.delete(url, headers=self._headers)
# remove token and obj from records
del self.pubdata[token]
开发者ID:numansiddique,项目名称:contrail-controller,代码行数:7,代码来源:client.py
注:本文中的requests.delete函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论