• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python json.loads函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.common.utils.json.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _handle_sync_response

 def _handle_sync_response(self, node, response, info, broker, http,
                           different_region=False):
     if response.status == HTTP_NOT_FOUND:  # completely missing, rsync
         self.stats['rsync'] += 1
         self.logger.increment('rsyncs')
         return self._rsync_db(broker, node, http, info['id'],
                               different_region=different_region)
     elif response.status == HTTP_INSUFFICIENT_STORAGE:
         raise DriveNotMounted()
     elif 200 <= response.status < 300:
         rinfo = json.loads(response.data)
         local_sync = broker.get_sync(rinfo['id'], incoming=False)
         if rinfo.get('metadata', ''):
             broker.update_metadata(json.loads(rinfo['metadata']))
         if self._in_sync(rinfo, info, broker, local_sync):
             return True
         # if the difference in rowids between the two differs by
         # more than 50% and the difference is greater than per_diff,
         # rsync then do a remote merge.
         # NOTE: difference > per_diff stops us from dropping to rsync
         # on smaller containers, who have only a few rows to sync.
         if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
                 info['max_row'] - rinfo['max_row'] > self.per_diff:
             self.stats['remote_merge'] += 1
             self.logger.increment('remote_merges')
             return self._rsync_db(broker, node, http, info['id'],
                                   replicate_method='rsync_then_merge',
                                   replicate_timeout=(info['count'] / 2000),
                                   different_region=different_region)
         # else send diffs over to the remote server
         return self._usync_db(max(rinfo['point'], local_sync),
                               broker, http, rinfo['id'], info['id'])
开发者ID:matthewoliver,项目名称:swift,代码行数:32,代码来源:db_replicator.py


示例2: test_handle_multipart_put_check_data_bad

    def test_handle_multipart_put_check_data_bad(self):
        bad_data = json.dumps(
            [
                {"path": "/checktest/a_1", "etag": "a", "size_bytes": "2"},
                {"path": "/checktest/badreq", "etag": "a", "size_bytes": "1"},
                {"path": "/checktest/b_2", "etag": "not-b", "size_bytes": "2"},
                {"path": "/checktest/slob", "etag": "not-slob", "size_bytes": "2"},
            ]
        )
        req = Request.blank(
            "/v1/AUTH_test/checktest/man?multipart-manifest=put",
            environ={"REQUEST_METHOD": "PUT"},
            headers={"Accept": "application/json"},
            body=bad_data,
        )

        status, headers, body = self.call_slo(req)
        self.assertEquals(self.app.call_count, 4)
        errors = json.loads(body)["Errors"]
        self.assertEquals(len(errors), 5)
        self.assertEquals(errors[0][0], "/checktest/a_1")
        self.assertEquals(errors[0][1], "Size Mismatch")
        self.assertEquals(errors[1][0], "/checktest/badreq")
        self.assertEquals(errors[1][1], "400 Bad Request")
        self.assertEquals(errors[2][0], "/checktest/b_2")
        self.assertEquals(errors[2][1], "Etag Mismatch")
        self.assertEquals(errors[3][0], "/checktest/slob")
        self.assertEquals(errors[3][1], "Size Mismatch")
        self.assertEquals(errors[4][0], "/checktest/slob")
        self.assertEquals(errors[4][1], "Etag Mismatch")
开发者ID:jettang,项目名称:icehouse,代码行数:30,代码来源:test_slo.py


示例3: update_data_record

    def update_data_record(self, record, list_meta=False):
        """
        Perform any mutations to container listing records that are common to
        all serialization formats, and returns it as a dict.

        Converts created time to iso timestamp.
        Replaces size with 'swift_bytes' content type parameter.

        :params record: object entry record
        :returns: modified record
        """
        (name, created, size, content_type, etag, metadata) = record
        if content_type is None:
            return {'subdir': name}
        response = {'bytes': size, 'hash': etag, 'name': name,
                    'content_type': content_type}
        if list_meta:
            metadata = json.loads(metadata)
            utf8encodekeys(metadata)
            response['metadata'] = metadata
        last_modified = datetime.utcfromtimestamp(float(created)).isoformat()
        # python isoformat() doesn't include msecs when zero
        if len(last_modified) < len("1970-01-01T00:00:00.000000"):
            last_modified += ".000000"
        response['last_modified'] = last_modified
        override_bytes_from_content_type(response, logger=self.logger)
        return response
开发者ID:pkit,项目名称:zwift,代码行数:27,代码来源:server.py


示例4: test_extract_tar_works

    def test_extract_tar_works(self):
        # On systems where $TMPDIR is long (like OS X), we need to do this
        # or else every upload will fail due to the path being too long.
        self.app.max_pathlen += len(self.testdir)
        for compress_format in ['', 'gz', 'bz2']:
            base_name = 'base_works_%s' % compress_format
            dir_tree = [
                {base_name: [{'sub_dir1': ['sub1_file1', 'sub1_file2']},
                             {'sub_dir2': ['sub2_file1', u'test obj \u2661']},
                             'sub_file1',
                             {'sub_dir3': [{'sub4_dir1': '../sub4 file1'}]},
                             {'sub_dir4': None},
                             ]}]

            build_dir_tree(self.testdir, dir_tree)
            mode = 'w'
            extension = ''
            if compress_format:
                mode += ':' + compress_format
                extension += '.' + compress_format
            tar = tarfile.open(name=os.path.join(self.testdir,
                                                 'tar_works.tar' + extension),
                               mode=mode)
            tar.add(os.path.join(self.testdir, base_name))
            tar.close()
            req = Request.blank('/tar_works/acc/cont/')
            req.environ['wsgi.input'] = open(
                os.path.join(self.testdir, 'tar_works.tar' + extension))
            req.headers['transfer-encoding'] = 'chunked'
            resp_body = self.handle_extract_and_iter(req, compress_format)
            resp_data = json.loads(resp_body)
            self.assertEquals(resp_data['Number Files Created'], 6)

            # test out xml
            req = Request.blank('/tar_works/acc/cont/')
            req.environ['wsgi.input'] = open(
                os.path.join(self.testdir, 'tar_works.tar' + extension))
            req.headers['transfer-encoding'] = 'chunked'
            resp_body = self.handle_extract_and_iter(
                req, compress_format, 'application/xml')
            self.assert_('<response_status>201 Created</response_status>' in
                         resp_body)
            self.assert_('<number_files_created>6</number_files_created>' in
                         resp_body)

            # test out nonexistent format
            req = Request.blank('/tar_works/acc/cont/?extract-archive=tar',
                                headers={'Accept': 'good_xml'})
            req.environ['REQUEST_METHOD'] = 'PUT'
            req.environ['wsgi.input'] = open(
                os.path.join(self.testdir, 'tar_works.tar' + extension))
            req.headers['transfer-encoding'] = 'chunked'

            def fake_start_response(*args, **kwargs):
                pass

            app_iter = self.bulk(req.environ, fake_start_response)
            resp_body = ''.join([i for i in app_iter])

            self.assert_('Response Status: 406' in resp_body)
开发者ID:BlueSkyChina,项目名称:swift,代码行数:60,代码来源:test_bulk.py


示例5: get_or_head_response

    def get_or_head_response(self, req, resp_headers, resp_iter):
        with closing_if_possible(resp_iter):
            resp_body = ''.join(resp_iter)
        try:
            segments = json.loads(resp_body)
        except ValueError:
            segments = []

        etag = md5()
        content_length = 0
        for seg_dict in segments:
            if seg_dict.get('range'):
                etag.update('%s:%s;' % (seg_dict['hash'], seg_dict['range']))
            else:
                etag.update(seg_dict['hash'])

            if config_true_value(seg_dict.get('sub_slo')):
                override_bytes_from_content_type(
                    seg_dict, logger=self.slo.logger)
            content_length += self._segment_length(seg_dict)

        response_headers = [(h, v) for h, v in resp_headers
                            if h.lower() not in ('etag', 'content-length')]
        response_headers.append(('Content-Length', str(content_length)))
        response_headers.append(('Etag', '"%s"' % etag.hexdigest()))

        if req.method == 'HEAD':
            return self._manifest_head_response(req, response_headers)
        else:
            return self._manifest_get_response(
                req, content_length, response_headers, segments)
开发者ID:bkolli,项目名称:swift,代码行数:31,代码来源:slo.py


示例6: test_handle_multipart_put_check_data_bad

 def test_handle_multipart_put_check_data_bad(self):
     bad_data = json.dumps(
         [
             {"path": "/c/a_1", "etag": "a", "size_bytes": "1"},
             {"path": "/c/a_2", "etag": "a", "size_bytes": "1"},
             {"path": "/d/b_2", "etag": "b", "size_bytes": "2"},
         ]
     )
     req = Request.blank(
         "/test_good/A/c/man?multipart-manifest=put",
         environ={"REQUEST_METHOD": "PUT"},
         headers={"Accept": "application/json"},
         body=bad_data,
     )
     try:
         self.slo.handle_multipart_put(req)
     except HTTPException, e:
         self.assertEquals(self.app.calls, 3)
         data = json.loads(e.body)
         errors = data["Errors"]
         self.assertEquals(errors[0][0], "/test_good/A/c/a_1")
         self.assertEquals(errors[0][1], "Size Mismatch")
         self.assertEquals(errors[2][1], "400 Bad Request")
         self.assertEquals(errors[-1][0], "/test_good/A/d/b_2")
         self.assertEquals(errors[-1][1], "Etag Mismatch")
开发者ID:mygoda,项目名称:openstack,代码行数:25,代码来源:test_slo.py


示例7: _handle_sync_response

 def _handle_sync_response(self, node, response, info, broker, http,
                           different_region=False):
     if response.status == HTTP_NOT_FOUND:  # completely missing, rsync
         self.stats['rsync'] += 1
         self.logger.increment('rsyncs')
         return self._rsync_db(broker, node, http, info['id'],
                               different_region=different_region)
     elif response.status == HTTP_INSUFFICIENT_STORAGE:
         raise DriveNotMounted()
     elif response.status >= 200 and response.status < 300:
         rinfo = json.loads(response.data)
         local_sync = broker.get_sync(rinfo['id'], incoming=False)
         if self._in_sync(rinfo, info, broker, local_sync):
             return True
         # if the difference in rowids between the two differs by
         # more than 50%, rsync then do a remote merge.
         if rinfo['max_row'] / float(info['max_row']) < 0.5:
             self.stats['remote_merge'] += 1
             self.logger.increment('remote_merges')
             return self._rsync_db(broker, node, http, info['id'],
                                   replicate_method='rsync_then_merge',
                                   replicate_timeout=(info['count'] / 2000),
                                   different_region=different_region)
         # else send diffs over to the remote server
         return self._usync_db(max(rinfo['point'], local_sync),
                               broker, http, rinfo['id'], info['id'])
开发者ID:hoangv2,项目名称:swift,代码行数:26,代码来源:db_replicator.py


示例8: deserialize_v1

    def deserialize_v1(cls, gz_file, metadata_only=False):
        """
        Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
        and `replica2part2dev_id` keys.

        If the optional kwarg `metadata_only` is True, then the
        `replica2part2dev_id` is not loaded and that key in the returned
        dictionary just has the value `[]`.

        :param file gz_file: An opened file-like object which has already
                             consumed the 6 bytes of magic and version.
        :param bool metadata_only: If True, only load `devs` and `part_shift`
        :returns: A dict containing `devs`, `part_shift`, and
                  `replica2part2dev_id`
        """

        json_len, = struct.unpack('!I', gz_file.read(4))
        ring_dict = json.loads(gz_file.read(json_len))
        ring_dict['replica2part2dev_id'] = []

        if metadata_only:
            return ring_dict

        partition_count = 1 << (32 - ring_dict['part_shift'])
        for x in xrange(ring_dict['replica_count']):
            ring_dict['replica2part2dev_id'].append(
                array.array('H', gz_file.read(2 * partition_count)))
        return ring_dict
开发者ID:linkedinyou,项目名称:swift,代码行数:28,代码来源:ring.py


示例9: get_slo_segments

    def get_slo_segments(self, obj_name, req):
        """
        Performs a swob.Request and returns the SLO manifest's segments.

        :raises HTTPServerError: on unable to load obj_name or
                                 on unable to load the SLO manifest data.
        :raises HTTPBadRequest: on not an SLO manifest
        :raises HTTPNotFound: on SLO manifest not found
        :returns: SLO manifest's segments
        """
        vrs, account, _junk = req.split_path(2, 3, True)
        new_env = req.environ.copy()
        new_env["REQUEST_METHOD"] = "GET"
        del (new_env["wsgi.input"])
        new_env["QUERY_STRING"] = "multipart-manifest=get"
        new_env["CONTENT_LENGTH"] = 0
        new_env["HTTP_USER_AGENT"] = "%s MultipartDELETE" % new_env.get("HTTP_USER_AGENT")
        new_env["swift.source"] = "SLO"
        new_env["PATH_INFO"] = ("/%s/%s/%s" % (vrs, account, obj_name.lstrip("/"))).encode("utf-8")
        resp = Request.blank("", new_env).get_response(self.app)

        if resp.is_success:
            if config_true_value(resp.headers.get("X-Static-Large-Object")):
                try:
                    return json.loads(resp.body)
                except ValueError:
                    raise HTTPServerError("Unable to load SLO manifest")
            else:
                raise HTTPBadRequest("Not an SLO manifest")
        elif resp.status_int == HTTP_NOT_FOUND:
            raise HTTPNotFound("SLO manifest not found")
        elif resp.status_int == HTTP_UNAUTHORIZED:
            raise HTTPUnauthorized("401 Unauthorized")
        else:
            raise HTTPServerError("Unable to load SLO manifest or segment.")
开发者ID:pchng,项目名称:swift,代码行数:35,代码来源:slo.py


示例10: get_or_head_response

    def get_or_head_response(self, req, resp_headers, resp_iter):
        with closing_if_possible(resp_iter):
            resp_body = "".join(resp_iter)
        try:
            segments = json.loads(resp_body)
        except ValueError:
            segments = []

        etag = md5()
        content_length = 0
        for seg_dict in segments:
            if seg_dict.get("range"):
                etag.update("%s:%s;" % (seg_dict["hash"], seg_dict["range"]))
            else:
                etag.update(seg_dict["hash"])

            if config_true_value(seg_dict.get("sub_slo")):
                override_bytes_from_content_type(seg_dict, logger=self.slo.logger)
            content_length += self._segment_length(seg_dict)

        response_headers = [(h, v) for h, v in resp_headers if h.lower() not in ("etag", "content-length")]
        response_headers.append(("Content-Length", str(content_length)))
        response_headers.append(("Etag", '"%s"' % etag.hexdigest()))

        if req.method == "HEAD":
            return self._manifest_head_response(req, response_headers)
        else:
            return self._manifest_get_response(req, content_length, response_headers, segments)
开发者ID:pchng,项目名称:swift,代码行数:28,代码来源:slo.py


示例11: _fetch_sub_slo_segments

    def _fetch_sub_slo_segments(self, req, version, acc, con, obj):
        """
        Fetch the submanifest, parse it, and return it.
        Raise exception on failures.
        """
        sub_req = make_subrequest(
            req.environ,
            path="/".join(["", version, acc, con, obj]),
            method="GET",
            headers={"x-auth-token": req.headers.get("x-auth-token")},
            agent=("%(orig)s " + "SLO MultipartGET"),
            swift_source="SLO",
        )
        sub_resp = sub_req.get_response(self.slo.app)

        if not is_success(sub_resp.status_int):
            close_if_possible(sub_resp.app_iter)
            raise ListingIterError(
                "ERROR: while fetching %s, GET of submanifest %s "
                "failed with status %d" % (req.path, sub_req.path, sub_resp.status_int)
            )

        try:
            with closing_if_possible(sub_resp.app_iter):
                return json.loads("".join(sub_resp.app_iter))
        except ValueError as err:
            raise ListingIterError(
                "ERROR: while fetching %s, JSON-decoding of submanifest %s "
                "failed with %s" % (req.path, sub_req.path, err)
            )
开发者ID:pchng,项目名称:swift,代码行数:30,代码来源:slo.py


示例12: parse_input

def parse_input(raw_data):
    """
    Given a request will parse the body and return a list of dictionaries
    :raises: HTTPException on parse errors
    :returns: a list of dictionaries on success
    """
    try:
        parsed_data = json.loads(raw_data)
    except ValueError:
        raise HTTPBadRequest("Manifest must be valid json.")

    req_keys = set(["path", "etag", "size_bytes"])
    opt_keys = set(["range"])
    try:
        for seg_dict in parsed_data:
            if not (req_keys <= set(seg_dict) <= req_keys | opt_keys) or "/" not in seg_dict["path"].lstrip("/"):
                raise HTTPBadRequest("Invalid SLO Manifest File")

            if seg_dict.get("range"):
                try:
                    seg_dict["range"] = Range("bytes=%s" % seg_dict["range"])
                except ValueError:
                    raise HTTPBadRequest("Invalid SLO Manifest File")
    except (AttributeError, TypeError):
        raise HTTPBadRequest("Invalid SLO Manifest File")

    return parsed_data
开发者ID:pchng,项目名称:swift,代码行数:27,代码来源:slo.py


示例13: test_direct_get_account

    def test_direct_get_account(self):
        stub_headers = HeaderKeyDict({
            'X-Account-Container-Count': '1',
            'X-Account-Object-Count': '1',
            'X-Account-Bytes-Used': '1',
            'X-Timestamp': '1234567890',
            'X-PUT-Timestamp': '1234567890'})

        body = '[{"count": 1, "bytes": 20971520, "name": "c1"}]'

        with mocked_http_conn(200, stub_headers, body) as conn:
            resp_headers, resp = direct_client.direct_get_account(
                self.node, self.part, self.account, marker='marker',
                prefix='prefix', delimiter='delimiter', limit=1000)
            self.assertEqual(conn.method, 'GET')
            self.assertEqual(conn.path, self.account_path)

        self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
        self.assertEqual(resp_headers, stub_headers)
        self.assertEqual(json.loads(body), resp)
        self.assertTrue('marker=marker' in conn.query_string)
        self.assertTrue('delimiter=delimiter' in conn.query_string)
        self.assertTrue('limit=1000' in conn.query_string)
        self.assertTrue('prefix=prefix' in conn.query_string)
        self.assertTrue('format=json' in conn.query_string)
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:25,代码来源:test_direct_client.py


示例14: _listing_pages_iter

 def _listing_pages_iter(self, lcontainer, lprefix, env):
     lpartition = self.app.container_ring.get_part(self.account_name, lcontainer)
     marker = ""
     while True:
         lreq = Request.blank("i will be overridden by env", environ=env)
         # Don't quote PATH_INFO, by WSGI spec
         lreq.environ["PATH_INFO"] = "/v1/%s/%s" % (self.account_name, lcontainer)
         lreq.environ["REQUEST_METHOD"] = "GET"
         lreq.environ["QUERY_STRING"] = "format=json&prefix=%s&marker=%s" % (quote(lprefix), quote(marker))
         lresp = self.GETorHEAD_base(
             lreq, _("Container"), self.app.container_ring, lpartition, lreq.swift_entity_path
         )
         if "swift.authorize" in env:
             lreq.acl = lresp.headers.get("x-container-read")
             aresp = env["swift.authorize"](lreq)
             if aresp:
                 raise ListingIterNotAuthorized(aresp)
         if lresp.status_int == HTTP_NOT_FOUND:
             raise ListingIterNotFound()
         elif not is_success(lresp.status_int):
             raise ListingIterError()
         if not lresp.body:
             break
         sublisting = json.loads(lresp.body)
         if not sublisting:
             break
         marker = sublisting[-1]["name"].encode("utf-8")
         yield sublisting
开发者ID:krishna-kashyap,项目名称:swift,代码行数:28,代码来源:obj.py


示例15: __call__

 def __call__(self, req):
     account = None
     try:
         (version, account, container, obj) = \
             split_path(req.path_info, 2, 4, True)
     except ValueError:
         pass
     if not account or not req.headers.get('x-web-mode'):
         return req.get_response(self.app)
     if not obj:
         req.query_string = 'format=json'
     resp = req.get_response(self.app)
     if resp.content_type == 'application/json':
         listing = json.loads(resp.body)
         template = self.get_template(req, account, container)
         if template:
             ctx = {
                 'account': account,
                 'container': container,
                 'listing': listing,
             }
             if container:
                 index = [o for o in listing if o['name'] == 'index.html']
                 if index:
                     headers = {'Location': '/v1/%s/%s/index.html' %
                                (account, container)}
                     return HTTPSeeOther(headers=headers)
             return Response(body=template.render(**ctx))
         else:
             index = [o for o in listing if o['name'] == 'index.html']
             if index:
                 headers = {'Location': '/v1/%s/%s/index.html' %
                            (account, container)}
                 return HTTPSeeOther(headers=headers)
     return resp
开发者ID:clayg,项目名称:swift-webmode,代码行数:35,代码来源:webtemplates.py


示例16: test_GET_OBJscope_conAttrs_metadata

    def test_GET_OBJscope_conAttrs_metadata(self):
        """
        In object scope, specifying container attrs
        We should get back the container that the object
        belongs to
        """
        attrs = Cattrs
        req2 = Request.blank(
            '/v1/TEST_acc1/TEST_con1/TEST_obj1',
            environ={'REQUEST_METHOD': 'GET',
            'HTTP_X_TIMESTAMP': '0'}, headers={'attributes': attrs, 'format': 'json'})
        resp2 = req2.get_response(self.controller)
        self.assert_(resp2.status.startswith('200'))
        testList = json.loads(resp2.body)
        self.assert_(len(testList) == 1)
        testDict = testList[0]
        self.assert_('/TEST_acc1/TEST_con1' in testDict)
        metaReturned = testDict['/TEST_acc1/TEST_con1']
        self.assertEquals(
            metaReturned['container_uri'], '/TEST_acc1/TEST_con1')

        self.assertEquals(metaReturned['container_name'], 'TEST_con1')
        self.assertEquals(metaReturned['container_account_name'], 'TEST_acc1')
        self.assertEquals(metaReturned['container_create_time'], self.t)
        self.assertEquals(metaReturned['container_object_count'], 33)
        self.assertEquals(metaReturned['container_bytes_used'], 3342)
        self.assertEquals(metaReturned['container_meta_TESTCUSTOM'], 'CUSTOM')
开发者ID:ucsc-hp-group,项目名称:swift,代码行数:27,代码来源:test_server.py


示例17: test_no_attributes_in_request_con_scope

    def test_no_attributes_in_request_con_scope(self):
        req = Request.blank(
            '/v1/TEST_acc1/TEST_con1',
            environ={'REQUEST_METHOD': 'GET',
            'HTTP_X_TIMESTAMP': '0'}, headers={'format': 'json'})
        resp = req.get_response(self.controller)
        self.assert_(resp.status.startswith('200'))
        testList = json.loads(resp.body)
        self.assertEquals(len(testList), 4)

        testDict = testList[0]
        self.assert_('/TEST_acc1' in testDict)
        metaReturned = testDict['/TEST_acc1']
        self.assertEquals(metaReturned['account_uri'], '/TEST_acc1')

        testDict = testList[1]
        self.assert_('/TEST_acc1/TEST_con1' in testDict)
        metaReturned = testDict['/TEST_acc1/TEST_con1']
        self.assertEquals(
            metaReturned['container_uri'], '/TEST_acc1/TEST_con1')

        testDict = testList[2]
        self.assert_('/TEST_acc1/TEST_con1/TEST_obj1' in testDict)
        metaReturned = testDict['/TEST_acc1/TEST_con1/TEST_obj1']
        self.assertEquals(
            metaReturned['object_uri'], '/TEST_acc1/TEST_con1/TEST_obj1')

        testDict = testList[3]
        self.assert_('/TEST_acc1/TEST_con1/TEST_obj2' in testDict)
        metaReturned = testDict['/TEST_acc1/TEST_con1/TEST_obj2']
        self.assertEquals(
            metaReturned['object_uri'], '/TEST_acc1/TEST_con1/TEST_obj2')
开发者ID:ucsc-hp-group,项目名称:swift,代码行数:32,代码来源:test_server.py


示例18: handle_multipart_delete

 def handle_multipart_delete(self, req):
     
     new_env = req.environ.copy()
     new_env['REQUEST_METHOD'] = 'GET'
     del(new_env['wsgi.input'])
     new_env['QUERY_STRING'] = 'multipart-manifest=get'
     new_env['CONTENT_LENGTH'] = 0
     new_env['HTTP_USER_AGENT'] = \
         '%s MultipartDELETE' % req.environ.get('HTTP_USER_AGENT')
     new_env['swift.source'] = 'SLO'
     get_man_resp = \
         Request.blank('', new_env).get_response(self.app)
         
     if get_man_resp.status_int // 100 == 2:
         if not config_true_value(
                 get_man_resp.headers.get('X-Static-Large-Object')):
             raise HTTPBadRequest('Not an SLO manifest')
         try:
             manifest = json.loads(get_man_resp.body)
         except ValueError:
             raise HTTPServerError('Invalid manifest file')
         delete_resp = self.bulk_deleter.handle_delete(
             req,
             objs_to_delete=[o['name'].encode('utf-8') for o in manifest],
             user_agent='MultipartDELETE', swift_source='SLO')
         if delete_resp.status_int // 100 == 2:
             # delete the manifest file itself
             return self.app
         else:
             return delete_resp
     return get_man_resp
开发者ID:sun3shines,项目名称:swift-1.7.4,代码行数:31,代码来源:slo.py


示例19: _reclaim

    def _reclaim(self, conn, timestamp):
        """
        Removes any empty metadata values older than the timestamp using the
        given database connection. This function will not call commit on the
        conn, but will instead return True if the database needs committing.
        This function was created as a worker to limit transactions and commits
        from other related functions.

        :param conn: Database connection to reclaim metadata within.
        :param timestamp: Empty metadata items last updated before this
                          timestamp will be removed.
        :returns: True if conn.commit() should be called
        """
        try:
            md = conn.execute('SELECT metadata FROM %s_stat' %
                              self.db_type).fetchone()[0]
            if md:
                md = json.loads(md)
                keys_to_delete = []
                for key, (value, value_timestamp) in md.iteritems():
                    if value == '' and value_timestamp < timestamp:
                        keys_to_delete.append(key)
                if keys_to_delete:
                    for key in keys_to_delete:
                        del md[key]
                    conn.execute('UPDATE %s_stat SET metadata = ?' %
                                 self.db_type, (json.dumps(md),))
                    return True
        except sqlite3.OperationalError as err:
            if 'no such column: metadata' not in str(err):
                raise
        return False
开发者ID:HoO-Group,项目名称:swift,代码行数:32,代码来源:db.py


示例20: test_bulk_delete_container_delete

 def test_bulk_delete_container_delete(self):
     req = Request.blank("/delete_cont_fail/AUTH_Acc", body="c\n", headers={"Accept": "application/json"})
     req.method = "DELETE"
     resp_body = self.handle_delete_and_iter(req)
     resp_data = json.loads(resp_body)
     self.assertEquals(resp_data["Number Deleted"], 0)
     self.assertEquals(resp_data["Errors"][0][1], "409 Conflict")
开发者ID:chmouel,项目名称:swift,代码行数:7,代码来源:test_bulk.py



注:本文中的swift.common.utils.json.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python wsgi.appconfig函数代码示例发布时间:2022-05-27
下一篇:
Python json.load函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap