本文整理汇总了Python中requests.put函数的典型用法代码示例。如果您正苦于以下问题:Python put函数的具体用法?Python put怎么用?Python put使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了put函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: add_attribute
def add_attribute(columns):
u = 'https://api.parse.com/1/classes/Attribute'
attrs_request = requests.get(u, data={
'limit': 500,
}, headers={
"X-Parse-Application-Id": keys.PARSE_APP_ID,
"X-Parse-REST-API-Key": keys.PARSE_REST_KEY
})
attribute_records = attrs_request.json()['results']
print 'Agregando a todos los registros'
for a in attribute_records:
columns_dict = {}
for column in columns:
if a.get(column) is None:
print "Agregar columna: ", column
columns_dict[column] = False
uu = 'https://api.parse.com/1/classes/Attribute/%s' % a.get('objectId')
requests.put(uu, data=json.dumps(columns_dict), headers={
"X-Parse-Application-Id": keys.PARSE_APP_ID,
"X-Parse-REST-API-Key": keys.PARSE_REST_KEY,
'Content-type': 'application/json'
})
print "Atributos agregados"
开发者ID:circleapp,项目名称:server,代码行数:28,代码来源:sqi.py
示例2: test_start_stop_server_with_different_ports
def test_start_stop_server_with_different_ports(self):
"""
ensure Start and stop server is working properly using ports other than default ports
"""
response = requests.get(__db_url__)
value = response.json()
if value:
db_length = len(value['databases'])
last_db_id = value['databases'][db_length-1]['id']
url = 'http://%s:8000/api/1.0/databases/%u/start' % \
(__host_or_ip__,last_db_id)
response = requests.put(url)
print "Starting...."
value = response.json()
if not value['statusstring']:
print "error"
elif "Start request sent successfully to servers" in value['statusstring']:
self.assertEqual(response.status_code, 200)
time.sleep(5)
CheckServerStatus(self, last_db_id, 'running')
time.sleep(10)
print "Stopping...."
url_stop = 'http://%s:8000/api/1.0/databases/%u/stop' % \
(__host_or_ip__,last_db_id)
response = requests.put(url_stop)
value = response.json()
if "Connection broken" in value['statusstring']:
self.assertEqual(response.status_code, 200)
time.sleep(10)
CheckServerStatus(self, last_db_id, 'stopped')
elif response.status_code == 500:
self.assertEqual(response.status_code, 500)
开发者ID:jango2015,项目名称:voltdb,代码行数:34,代码来源:start_stop_database_test.py
示例3: demo_store_secret_two_step_binary
def demo_store_secret_two_step_binary():
"""Store secret (2-step):"""
secret = 'bXktc2VjcmV0LWhlcmU=' # base64 of 'my secret'
ep_2step = '/'.join([end_point, version, 'secrets'])
# POST metadata:
payload = {}
pr = requests.post(ep_2step, data=json.dumps(payload), headers=hdrs)
pr_j = pr.json()
secret_ref = pr_j.get('secret_ref')
assert secret_ref
# PUT data to store:
hdrs_put = dict(hdrs)
hdrs_put.update({
'content-type': 'application/octet-stream',
'content-encoding': 'base64'}
)
requests.put(secret_ref, data=secret, headers=hdrs_put)
# GET secret:
hdrs_get = dict(hdrs)
hdrs_get.update({
'accept': 'application/octet-stream'})
gr = requests.get(secret_ref, headers=hdrs_get)
LOG.info('Get secret 2-step (binary): {0}\n'.format(gr.content))
return secret_ref
开发者ID:abattye,项目名称:barbican,代码行数:28,代码来源:demo_requests.py
示例4: update_article
def update_article(article, checked_urls):
article_id = article["article_id"]
oauth_token = g.user.oauth_token
oauth_token_secret = g.user.oauth_token_secret
oauth = OAuth1(
client_key,
client_secret=client_secret,
resource_owner_key=oauth_token,
resource_owner_secret=oauth_token_secret,
)
processed_urls = []
for k, u in checked_urls.items():
if u["uri"] in processed_urls:
continue
processed_urls.append(u["uri"])
body = {"link": u["web"]}
headers = {"content-type": "application/json"}
response = requests.put(
"http://api.figshare.com/v1/my_data/articles/{}/links".format(article_id),
data=json.dumps(body),
headers=headers,
auth=oauth,
)
results = json.loads(response.content)
app.logger.debug("Added {} with the following results:\n{}".format(u["uri"], results))
app.logger.debug("Tag with Linkitup")
body = {"tag_name": "Enriched with Linkitup"}
headers = {"content-type": "application/json"}
response = requests.put(
"http://api.figshare.com/v1/my_data/articles/{}/tags".format(article_id),
data=json.dumps(body),
headers=headers,
auth=oauth,
)
app.logger.debug("Add a link to Linkitup")
body = {"link": "http://linkitup.data2semantics.org"}
headers = {"content-type": "application/json"}
response = requests.put(
"http://api.figshare.com/v1/my_data/articles/{}/links".format(article_id),
data=json.dumps(body),
headers=headers,
auth=oauth,
)
app.logger.debug("Added enriched with Linkitup tag")
publish_nanopublication(article, checked_urls, oauth)
return
开发者ID:pombredanne,项目名称:linkitup,代码行数:60,代码来源:figshare.py
示例5: commit_file_code
def commit_file_code(self, file_name, obj_details):
file_content = open(file_name).read()
self.commit_response = {}
self.email = obj_details.get("EMAIL")
params = {}
content_encoded = base64.b64encode(file_content)
params['message'] = file_name + " created"
params['content'] = content_encoded
params['branch'] = "abap"
params['path'] = file_name
params['committer'] = {'name': "1", 'email': self.email}
url = settings.CONFIG_GITHUB_URL + file_name
self.check_sleep_and_set_api_count()
request_status = requests.put(url, auth=(settings.GIT_USERNAME, settings.GIT_PASSWORD), data=json.dumps(params))
if request_status.status_code == 201:
self.commit_response = request_status.json()
elif request_status.status_code == 422:
new_params = {}
new_params['ref'] = 'abap'
new_params['path'] = file_name
self.check_sleep_and_set_api_count()
get_file = requests.get(url, auth=(settings.GIT_USERNAME, settings.GIT_PASSWORD), params=new_params).json()
new_checksum = githash(open(file_name).read())
if new_checksum != get_file['sha']:
params['sha'] = get_file['sha']
params['message'] = file_name + " updated"
self.check_sleep_and_set_api_count()
request_status = requests.put(url, auth=(settings.GIT_USERNAME, settings.GIT_PASSWORD), data=json.dumps(params))
self.commit_response = request_status.json()
self.log_to_db()
开发者ID:AvraGitHub,项目名称:GithubAPI,代码行数:30,代码来源:record_mgmt.py
示例6: upload
def upload(filepath):
# http://api.smugmug.com/services/api/?method=upload&version=1.3.0
fstat = os.stat(filepath)
filename = os.path.basename(filepath)
hash_md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
file_md5 = hash_md5.hexdigest()
params = {
filenameSize: fstat.ST_SIZE,
filenameMD5: file_md5,
'X-Smug-AlbumID': None,
'X-Smug-Caption': None,
'X-Smug-Pretty': true,
'X-Smug-SessionID': None,
'X-Smug-Version': '1.3.0'
}
requests.put('http://upload.smugmug.com/test.jpg')
payload['APIKey'] = key
payload['Pretty'] = True
payload['method'] = method
#payload['SessionID'] = secret
#payload = { 'APIKey':
req = requests.get(url, params=payload)
#print "fetched url: %s" % req.url
return req.json()
开发者ID:tedder,项目名称:smugmug-website-builder,代码行数:29,代码来源:mug.py
示例7: editProfile
def editProfile(self,gender,bio,website,name,location,username,email,birthday):
#gender syntax: 0 = male, 1 = female
#birthday syntax: 1990-02-01 (Y-M-D)
payload = {
"user[gender]": gender,
"user[bio]" : bio,
"user[website]" : website,
"user[name]" : name,
"user[location]" : location,
"user[username]" : username,
"user[email]" : email,
"user[birthday]" : birthday + "T05:00:00Z"
}
headers = {
"accept" : "application/json",
"accept-language" : "en",
"connection" : "keep-alive",
"x-user-authentication-token" : self.authToken,
"x-client-version" : self.clientVersion,
"x-device-id" : self.deviceId,
"user-agent" : self.useragent
}
if self.proxies == None:
r = requests.put(self.base + "/users/" + self.info['_id'],headers=headers,data=payload)
else:
r = requests.put(self.base + "/users/" + self.info['_id'],headers=headers,data=payload,proxies=self.proxies)
if r.status_code == requests.codes.ok:
edit_response = r.text
tmpinfo = json.loads(edit_response.decode('utf8'))
self.info = tmpinfo
return True
else:
return False
开发者ID:Pilfer,项目名称:GifboomAPI,代码行数:35,代码来源:gifboom.py
示例8: put_mapping
def put_mapping(mapping_dict):
for index in mapping_dict:
if not index.startswith('_'):
# create the index first
i = config['ELASTIC_SEARCH_HOST']
i += '/' + index
ri = requests.put(i)
if ri.status_code != 200:
print 'Failed to create Index:', index, ', HTTP Response:', ri.status_code
print ri.text
sys.exit(3)
# now create each type inside the index
for key, mapping in mapping_dict[index]['mappings'].iteritems():
im = i + '/' + key + '/_mapping'
exists = requests.get(im)
# do not overwrite existing mappings
if exists.status_code != 200:
themapping = {}
themapping[key] = mapping
r = requests.put(im, json.dumps(themapping))
if r.status_code != 200:
print 'Failed to do PUT mapping for Index:', index, ', Key:', key, ', HTTP Response:', r.status_code
sys.exit(4)
else:
print 'Mapping OK for Index:', index, ', Key:', key, ', HTTP Response:', r.status_code
else:
print 'Mapping already exists for Index:', index, ', Key:', key
else:
print 'Ignoring {0}, no index names start with _'.format(index)
开发者ID:CottageLabs,项目名称:sysadmin,代码行数:29,代码来源:bulk_restore.py
示例9: updateLSPs
def updateLSPs(linkPathDict, linkDict):
if len(linkPathDict) == 0:
return
r = requests.get('https://10.10.2.25:8443/NorthStar/API/v1/tenant/1/topology/1/te-lsps/', headers=authHeader,
verify=False)
lsp_list = json.loads(json.dumps(r.json()))
new_lsps = []
for lsp in lsp_list:
if lsp['name'] not in linkPathDict:
continue
# Fill only the required fields
ero = []
path = linkPathDict[lsp['name']]
for i in range(0, len(path) - 1):
ero.append({'topoObjectType': 'ipv4', 'address': getZNodeIpAddress(path[i], path[i + 1], linkDict)})
new_lsp = {}
for key in ('from', 'to', 'name', 'lspIndex', 'pathType'):
new_lsp[key] = lsp[key]
new_lsp['plannedProperties'] = {'ero': ero}
new_lsps.append(new_lsp)
requests.put('https://10.10.2.25:8443/NorthStar/API/v1/tenant/1/topology/1/te-lsps/bulk',
json=new_lsps, headers=authHeader, verify=False)
开发者ID:Jeremy-WEI,项目名称:sdn-throwdown,代码行数:26,代码来源:function_util.py
示例10: _push_metadata_software_deployments
def _push_metadata_software_deployments(
self, cnxt, server_id, stack_user_project_id):
rs = db_api.resource_get_by_physical_resource_id(cnxt, server_id)
if not rs:
return
deployments = self.metadata_software_deployments(cnxt, server_id)
md = rs.rsrc_metadata or {}
md['deployments'] = deployments
rows_updated = db_api.resource_update(
cnxt, rs.id, {'rsrc_metadata': md}, rs.atomic_key)
if not rows_updated:
action = "deployments of server %s" % server_id
raise exception.ConcurrentTransaction(action=action)
metadata_put_url = None
metadata_queue_id = None
for rd in rs.data:
if rd.key == 'metadata_put_url':
metadata_put_url = rd.value
if rd.key == 'metadata_queue_id':
metadata_queue_id = rd.value
if metadata_put_url:
json_md = jsonutils.dumps(md)
requests.put(metadata_put_url, json_md)
if metadata_queue_id:
project = stack_user_project_id
token = self._get_user_token(cnxt, rs, project)
zaqar_plugin = cnxt.clients.client_plugin('zaqar')
zaqar = zaqar_plugin.create_for_tenant(project, token)
queue = zaqar.queue(metadata_queue_id)
queue.post({'body': md, 'ttl': zaqar_plugin.DEFAULT_TTL})
开发者ID:jasondunsmore,项目名称:heat,代码行数:31,代码来源:service_software_config.py
示例11: move_card
def move_card(self, card, list):
""" Moves a card to a new list """
# TODO this doesn't work
url = BASE_URL + 'cards/' + card['id'] + '/idList'
params = self.request_params({'value': list['id']})
requests.put(url, params=params)
开发者ID:Natman64,项目名称:Trellonos,代码行数:7,代码来源:trellotools.py
示例12: putRequest
def putRequest(queue, payload=None):
response = {}
statusCode = {}
data = {}
while not queue.empty():
resourceURI = queue.get(timeout=1)
response["Node"] = resourceURI
try:
if payload is None:
r = requests.put(resourceURI, timeout=20)
else:
r = requests.put(resourceURI, data=payload, timeout=20)
if r.headers["Content-Type"] == "application/json":
data = r.json
else:
data = r.text
response["StatusCode"] = r.status_code
response["Data"] = data
except requests.exceptions.Timeout:
response["StatusCode"] = 408
response["Data"] = data
except requests.exceptions.ConnectionError:
response["Node"] = resourceURI
statusCode["StatusCode"] = 404
response["Data"] = "n/a"
GreenletRequests.NodeResponsesPost.append(response)
print "Threaded PUT with ID " + str(GreenletRequests.npo) + " executed for " + resourceURI
GreenletRequests.npo += 1
gevent.sleep(0)
开发者ID:ioan-dragan,项目名称:DICE-Project,代码行数:30,代码来源:greenletThreads.py
示例13: geo_locations_to_es
def geo_locations_to_es(self):
max_items = self.elastic.max_items_bulk
current = 0
bulk_json = ""
url = self.elastic.url + "/github/geolocations/_bulk"
logging.debug("Adding geoloc to %s (in %i packs)" % (url, max_items))
for loc in self.geolocations:
if current >= max_items:
requests.put(url, data=bulk_json)
bulk_json = ""
current = 0
geopoint = self.geolocations[loc]
location = geopoint.copy()
location["location"] = loc
# First upload the raw issue data to ES
data_json = json.dumps(location)
# Don't include in URL non ascii codes
safe_loc = str(loc.encode('ascii', 'ignore'),'ascii')
geo_id = str("%s-%s-%s" % (location["lat"], location["lon"],
safe_loc))
bulk_json += '{"index" : {"_id" : "%s" } }\n' % (geo_id)
bulk_json += data_json +"\n" # Bulk document
current += 1
requests.put(url, data = bulk_json)
logging.debug("Adding geoloc to ES Done")
开发者ID:pombredanne,项目名称:GrimoireELK,代码行数:32,代码来源:github.py
示例14: put
def put(self, file_path, environmentObj, container, environment, act_as_user):
self.fetchUserPass(environment)
data = open(file_path).read()
chronos_resource = "scheduler/iso8601"
if 'parents' in json.loads(data):
chronos_resource = "scheduler/dependency"
print(colored("TRIGGERING CHRONOS FRAMEWORK UPDATE FOR JOB: {}".format(container), "cyan"))
print(colored("curl -X PUT -H 'Content-type: application/json' --data-binary @{} {}/{}".format(
file_path, environmentObj['chronos_endpoint'], chronos_resource), "cyan"))
endpoint = environmentObj['chronos_endpoint']
deploy_url = "{}/{}".format(endpoint, chronos_resource)
if not act_as_user:
resp = requests.put(deploy_url, data=data,
headers={'Content-type': 'application/json'},
auth=(self.user, self.passw),
allow_redirects=True)
else:
resp = requests.put(deploy_url, data=data,
headers={'Content-type': 'application/json', 'act-as-user': act_as_user},
auth=(self.user, self.passw),
allow_redirects=True)
chronos_message = "{}".format(resp)
print(colored(chronos_message, "yellow"))
task_id = []
body = json.loads(data)
if 'name' in body:
task_id.append(body['name'])
return resp, task_id
开发者ID:seomoz,项目名称:roger-mesos-tools,代码行数:31,代码来源:chronos.py
示例15: upload_image
def upload_image(self, image_id, parent_id, token):
layer = self.gen_random_string(7 * 1024 * 1024)
json_obj = {
'id': image_id
}
if parent_id:
json_obj['parent'] = parent_id
json_data = json.dumps(json_obj)
h = hashlib.sha256(json_data + '\n')
h.update(layer)
layer_checksum = 'sha256:{0}'.format(h.hexdigest())
resp = requests.put('{0}/v1/images/{1}/json'.format(
self.registry_endpoint, image_id),
data=json_data,
headers={'Authorization': 'Token ' + token,
'X-Docker-Checksum': layer_checksum},
cookies=self.cookies)
self.assertEqual(resp.status_code, 200, resp.text)
self.update_cookies(resp)
resp = requests.put('{0}/v1/images/{1}/layer'.format(
self.registry_endpoint, image_id),
data=self.generate_chunk(layer),
headers={'Authorization': 'Token ' + token},
cookies=self.cookies)
self.assertEqual(resp.status_code, 200, resp.text)
self.update_cookies(resp)
return {'id': image_id, 'checksum': layer_checksum}
开发者ID:23critters,项目名称:docker-registry,代码行数:27,代码来源:workflow.py
示例16: put
def put(self, id=None):
domain = Domain.query.get(id)
if current_user.has_permission(
EditDomainPermission,
getattr(domain.domain_controller, 'id')
):
if 'domain_controller' in request.json:
# If the controller is to be changed in the _edit,
# Delete the domain on the current controller
if domain.domain_controller is not None and\
request.json['domain_controller'] is not None:
self._delete_on_dc(domain)
# If the domain is currently on the default controller and the
# new controller is expected to be different, delete it on the
# default controller
if domain.domain_controller is None and\
request.json['domain_controller'] is not None:
self._delete_on_dc(domain)
# If we are changing the controller to be the default one
if domain.domain_controller is not None and\
request.json['domain_controller'] is None:
self._delete_on_dc(domain)
domain = self._editDomain(id)
req.put(
'{}/{}'.format(self._get_url(domain), id),
headers=self._get_headers(),
data=json.dumps(marshal(domain, domain_fields)),
verify=self._get_verify(domain)
)
return self.get(domain.id)
开发者ID:floe-charest,项目名称:jeto,代码行数:35,代码来源:domains.py
示例17: docker_push
def docker_push(self):
# Test Push
self.image_id = self.gen_random_string()
self.parent_id = self.gen_random_string()
image_id = self.image_id
parent_id = self.parent_id
namespace = self.user_credentials[0]
repos = self.gen_random_string()
# Docker -> Index
images_json = json.dumps([{'id': image_id}, {'id': parent_id}])
resp = requests.put('{0}/v1/repositories/{1}/{2}/'.format(
self.index_endpoint, namespace, repos),
auth=tuple(self.user_credentials),
headers={'X-Docker-Token': 'true'},
data=images_json)
self.assertEqual(resp.status_code, 200, resp.text)
token = resp.headers.get('x-docker-token')
# Docker -> Registry
images_json = []
images_json.append(self.upload_image(parent_id, None, token))
images_json.append(self.upload_image(image_id, parent_id, token))
# Updating the tags does not need a token, it will use the Cookie
self.update_tag(namespace, repos, image_id, 'latest')
# Docker -> Index
resp = requests.put('{0}/v1/repositories/{1}/{2}/images'.format(
self.index_endpoint, namespace, repos),
auth=tuple(self.user_credentials),
headers={'X-Endpoints': 'registrystaging-docker.dotcloud.com'},
data=json.dumps(images_json))
self.assertEqual(resp.status_code, 204)
return (namespace, repos)
开发者ID:23critters,项目名称:docker-registry,代码行数:31,代码来源:workflow.py
示例18: switch_off_on
def switch_off_on(lightId):
payload = '{"on": false}'
r = requests.put(lightPath+lightId+'/state', data=payload)
time.sleep(0.5)
payload = '{"on": true}'
r = requests.put(lightPath+lightId+'/state', data=payload)
开发者ID:jeurgen,项目名称:ring-lights,代码行数:7,代码来源:lightring.py
示例19: changeProfilePicture
def changeProfilePicture(self,photo):
headers = {
"accept" : "application/json",
"accept-language" : "en",
"connection" : "keep-alive",
"x-user-authentication-token" : self.authToken,
"x-client-version" : self.clientVersion,
"x-device-id" : self.deviceId,
"user-agent" : self.useragent
}
files = {
'user[avatar]': (photo, open(photo, 'rb'))
}
if self.proxies == None:
r = requests.put(self.base + "/users/" + self.info['_id'],headers=headers,files=files)
else:
r = requests.put(self.base + "/users/" + self.info['_id'],headers=headers,files=files,proxies=self.proxies)
if r.status_code == requests.codes.ok:
old_avatar = self.info['avatar']
upload_response = r.text
tmpinfo = json.loads(upload_response.decode('utf8'))
if tmpinfo['avatar'] != old_avatar:
self.info = tmpinfo
return True
else:
self.info = json.loads(upload_response.decode('utf8'))
return False
else:
return False
开发者ID:Pilfer,项目名称:GifboomAPI,代码行数:30,代码来源:gifboom.py
示例20: test_request_with_id_member
def test_request_with_id_member(self):
"""
ensure id and members are not allowed in payload
"""
response = requests.get(__url__)
value = response.json()
if value:
db_length = len(value['databases'])
last_db_id = value['databases'][db_length-1]['id']
print 'Database id to be updated is ' + str(last_db_id)
url = __url__ + str(last_db_id)
response = requests.put(url, json={'name': 'test', 'members': [3]})
value = response.json()
self.assertEqual(value['error'], 'You cannot specify \'Members\' while updating database.')
self.assertEqual(response.status_code, 404)
response = requests.put(url, json={'name': 'test', 'id': 33333})
value = response.json()
self.assertEqual(value['error'], 'Database Id mentioned in the payload and url doesn\'t match.')
self.assertEqual(response.status_code, 404)
response = requests.put(url, json={'name': 'test123', 'id': last_db_id})
value = response.json()
self.assertEqual(value['status'], 200)
self.assertEqual(response.status_code, 200)
开发者ID:AdvEnc,项目名称:voltdb,代码行数:27,代码来源:database_test.py
注:本文中的requests.put函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论