• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python escape.json_decode函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.escape.json_decode函数的典型用法代码示例。如果您正苦于以下问题:Python json_decode函数的具体用法?Python json_decode怎么用?Python json_decode使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了json_decode函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get

    async def get(self,
                  id: Union[int, str],
                  *,
                  organization_id: Union[str, int],
                  columns=None):
        url = '{base_url}/{id}?{query}'.format(
            base_url=self.module_url,
            id=id,
            query=urlencode({
                'organization_id': organization_id,
                **self.base_query}))

        try:
            logger.info('GET: {}'.format(url))
            response  = await self.http_client.fetch(url, method='GET')
        except HTTPClientError as http_error:
            http_code = http_error.code
            response  = json_decode(http_error.response.body.decode("utf-8"))
            message   = str(response['code']) + ': ' + response['message']
            raise HTTPError(http_code, reason=message)
        else:
            response  = json_decode(response.body.decode("utf-8"))
            results   = [v for k, v in response.items() if k not in ['code', 'message']]  # noqa

            if len(results) != 1:
                ValueError('More then one resource was returned.')

            return results[0]
开发者ID:i-Dynamics,项目名称:Talk-Zoho,代码行数:28,代码来源:base_resource.py


示例2: require_auth

        def require_auth(handler, kwargs):

            auth = handler.request.headers.get('Authorization')
            if auth:
                parts = auth.split()

                if parts[0].lower() != 'bearer':
                    handler._transforms = []
                    handler.writejson(json_decode(str(ApiHTTPError(10405))))
                    handler.finish()
                elif len(parts) == 1:
                    handler._transforms = []
                    handler.writejson(json_decode(str(ApiHTTPError(10405))))
                    handler.finish()
                elif len(parts) > 2:
                    handler._transforms = []
                    handler.writejson(json_decode(str(ApiHTTPError(10405))))
                    handler.finish()

                token = parts[1]
                try:
                    res = jwt.decode(
                        token,
                        secret_key,
                        options=options
                    )
                except Exception, e:
                    handler._transforms = []
                    handler.set_status(200)
                    handler.writejson({'message': e.message, 'code': 10416})
                    handler.finish()
开发者ID:yancyzhou,项目名称:Tornado_Restful,代码行数:31,代码来源:__init__.py


示例3: post

	def post(self):
		habrachat_cookie = self.get_cookie("habrachat")
		if not habrachat_cookie:
			habrachat_cookie = _session_id()
			self.set_cookie("habrachat", habrachat_cookie)

		token = self.get_argument("token", None)
		if not token:
			log.warning("Not have Token")
			self.finish()
			return
		client = httpclient.AsyncHTTPClient()
		response = yield client.fetch(
			"http://u-login.com/token.php?token=%s&host=%s://%s" % (token, self.request.protocol, self.request.host),
			use_gzip=True
		)
		if response.code != 200:
			log.warning("Not have access to u-login")
			self.finish()
			return

		json_response = json_decode(response.body)
		if "error_type" in json_response:
			log.warning("Error auth: %s" % json_response["error_message"])
			self.finish()
			return

		json_response = json_decode(response.body)
		if "error" in json_response:
			log.warning("Error auth: %s" % json_response["error"])
			self.finish()
			return

		identity = json_response.get("identity")
		if not identity:
			log.error("Not have indentity! json: %s" % json_response)
		log.info("New user indetity: %s" % identity)
		user_id = hashlib.md5(utf8(identity)).hexdigest()
		new_user = {"id": user_id, "name": None}

		new_user_name = ""
		if "nickname" in json_response:
			new_user_name = json_response.get("nickname", "")
		if not new_user["name"] and "first_name" in json_response:
			new_user_name = json_response.get("first_name", "")

		new_user["name"] = new_user_name[:20].replace("[", "{").replace("]", "}").encode('UTF-8')

		new_user["avatar"] = json_response.get("photo")
		new_user["ismoderator"] = identity in options.moderators

		old_user_settings = yield tornado.gen.Task(self.redis.get, "setting_"+user_id)
		if not old_user_settings:
			new_user_settings = {
				"revert_chat_order": False,
				"send_message_enter": False
			}
			yield tornado.gen.Task(self.redis.set, "setting_"+user_id,  json_encode(recursive_unicode(new_user_settings)))
		yield tornado.gen.Task(self.redis.set, habrachat_cookie,  json_encode(recursive_unicode(new_user)))
		self.redirect("/")
开发者ID:ve1ikiy,项目名称:habrachat,代码行数:60,代码来源:habrachat.py


示例4: post

    def post(self):
        ts_data = json_decode(self.get_argument('ts_data'))
        model_id = json_decode(self.get_argument('modelID'))
        meta_feats = json_decode(self.get_argument('meta_features', 'null'))
        impute_kwargs = json_decode(self.get_argument('impute_kwargs', '{}'))

        model = Model.query.get(model_id)
        model_data = joblib.load(model.file_uri)
        if hasattr(model_data, 'best_estimator_'):
            model_data = model_data.best_estimator_
        features_to_use = model.featureset.features_list

        fset = featurize.featurize_time_series(*ts_data,
                                               features_to_use=features_to_use,
                                               meta_features=meta_feats,
                                               raise_exceptions=False)
        fset = featurize.impute_featureset(fset, **impute_kwargs)
        fset.index = fset.index.astype(str)  # ensure JSON-encodable
        data = {'preds': model_data.predict(fset)}
        if hasattr(model_data, 'predict_proba'):
            data['pred_probs'] = pd.DataFrame(model_data.predict_proba(fset),
                                              index=fset.index,
                                              columns=model_data.classes_)
        else:
            data['pred_probs'] = []
        pred_info = Prediction.format_pred_data(fset, data)
        return self.success(pred_info)
开发者ID:cesium-ml,项目名称:cesium_web,代码行数:27,代码来源:prediction.py


示例5: get

    def get(self, article_id):
        logging.info("got article_id %r in uri", article_id)
        logging.info(self.request)

        url = "http://" + STP + "/blogs/articles/" + article_id
        http_client = HTTPClient()
        response = http_client.fetch(url, method="GET")
        logging.info("got _article response %r", response.body)
        _article = json_decode(response.body)
        _timestamp = _article["timestamp"]
        _datetime = timestamp_datetime(_timestamp / 1000)
        _article["timestamp"] = _datetime
        try:
            _article['accountNickname']
        except:
            _article['accountNickname'] = "anonymous"

        url = "http://" + STP + "/blogs/my-articles/" + article_id + "/paragraphs"
        http_client = HTTPClient()
        response = http_client.fetch(url, method="GET")
        logging.info("got _paragraphs response %r", response.body)
        _paragraphs = json_decode(response.body)

        self.render('blog/article-edit.html',
                article=_article,
                paragraphs=_paragraphs)
开发者ID:ThomasZh,项目名称:backbone,代码行数:26,代码来源:ui_blog.py


示例6: put

    def put(self, path, request):
        """
        Handle an HTTP PUT request.

        This method handles an HTTP PUT request, returning a JSON response.

        :param path: URI path of request
        :param request: HTTP request object
        :return: an ApiAdapterResponse object containing the appropriate response
        """
        # Update the target specified in the path, or all targets if none specified

        try:
            json_decode(request.body)  # ensure request body is JSON. Will throw a TypeError if not
            if "/" in path:
                path_elem, target_path = path.split('/', 1)
            else:
                path_elem = path
                target_path = ""
            for target in self.targets:
                if path_elem == '' or path_elem == target.name:
                    target.remote_set(target_path, request.body)
            response = self.param_tree.get(path)
            status_code = 200
        except ParameterTreeError as param_tree_err:
            response = {'error': str(param_tree_err)}
            status_code = 400
        except (TypeError, ValueError) as type_val_err:
            response = {'error': 'Failed to decode PUT request body: {}'.format(str(type_val_err))}
            status_code = 415

        return ApiAdapterResponse(response, status_code=status_code)
开发者ID:percival-detector,项目名称:odin,代码行数:32,代码来源:proxy.py


示例7: run

	def run(self):
		book = self.book
		lock = self.lock
		whoToAsk = _setWhoToAsk(self.user, self.whoToAsk, self.book, self.connectAll, self.maxConnection)
		
		http_client = tornado.httpclient.HTTPClient()
		_body = 'userName=' + escape.url_escape(self.user.name)
		_body += '&userPubKey=' + escape.url_escape(self.user.pubKey)
		_body += '&host=' + escape.url_escape(self.user.host)
		_body += '&port=' + escape.url_escape(str(self.user.port))
		_body += '&invitation=' + escape.url_escape(self.user.invitation if self.user.invitation is not None else ' ')
		_body += '&approverID=' + escape.url_escape(self.user.approverID if self.user.approverID is not None else ' ')
		_body += '&bookID=' + escape.url_escape(self.book.getID())
		_body += '&password=' + escape.url_escape(_authCode(self.user))
		for user in whoToAsk:
			try:
				address = 'http://' + str(user.host) + ':' + str(user.port)
				if self.askMember:
					response = http_client.fetch(address + '/members', method = 'POST', body = _body)
					members = escape.json_decode(response.body)
					_updateMembers(members, self.user, book, lock, address)
				if self.askAction:
					response = http_client.fetch(address + '/versions', method = 'POST', body = _body)
					versions = escape.json_decode(response.body)
					_updateActions(versions, self.user, book, lock, address)
			except tornado.httpclient.HTTPError as e:
				#todo: log
				print("Error:" + str(e))
				pass
			except ValueError:
				print('ValueError')
开发者ID:aphlysia,项目名称:ahiru,代码行数:31,代码来源:manager.py


示例8: get

    def get(self):
        # get otp from request
        otp = self.get_argument("otp")

        try:
            # make request to auth server
            response = yield AsyncHTTPClient().fetch(
                self.auth_url + '/control/get_user',
                method="POST",
                headers=self.auth_headers,
                body=urlencode({"user_otp": otp})
            )
        except ClientHTTPError as error:
            # check content type for json to try and parse result
            if error.response and error.response.headers.get('Content-Type') is 'application/json; charset=UTF-8':
                # decode the response
                content = json_decode(error.response.body)

                # catch client otp errors
                if content['status_code'] == 400:
                    raise ServerHTTPError(401, reason=content['message'])

            # handle all other cases as internal errors
            raise error

        else:
            # set client cookie and respond
            content = json_decode(response.body)
            self.set_secure_cookie(self.cookie_name, str(content['result']['id']))
            self.write({"result": "OK"})
            self.finish()
开发者ID:i-Dynamics,项目名称:Talk-A2Z,代码行数:31,代码来源:remote_sign_in_handler.py


示例9: _get_api_token

 def _get_api_token(self, world_id, st):
     world_ip = dmm.WORLD_IP[world_id-1]
     url = dmm.GET_FLASH_URL % (world_ip, self.owner, int(time.time()*1000))
     body = urlencode({'url': url,
                       'httpMethod': 'GET',
                       'authz': 'signed',
                       'st': st,
                       'contentType': 'JSON',
                       'numEntries': '3',
                       'getSummaries': 'false',
                       'signOwner': 'true',
                       'signViewer': 'true',
                       'gadget': 'http://203.104.209.7/gadget.xml',
                       'container': 'dmm'})
     try:
         req = yield self.http_client.fetch(dmm.MAKE_REQUEST_URL, method='POST', headers=self.headers, body=body,
                                            connect_timeout=self.connect_timeout,
                                            request_timeout=self.request_timeout,
                                            proxy_host=proxy_host, proxy_port=proxy_port)
     except (CurlError, HTTPError):
         raise OoiAuthError('连接api_token服务器失败')
     svdata = json_decode(native_str(req.body)[27:])
     if svdata[url]['rc'] != 200:
         raise OoiAuthError('获取api_token失败')
     svdata = json_decode(svdata[url]['body'][7:])
     if svdata['api_result'] != 1:
         raise OoiAuthError('获取api_token失败')
     return world_ip, svdata['api_token'], svdata['api_starttime']
开发者ID:ZaoRiTian,项目名称:ooi2,代码行数:28,代码来源:kancolle.py


示例10: test_emit_setup_with_bad_counter_value_type

    def test_emit_setup_with_bad_counter_value_type(self):
        ws = yield self.ws_connect('/ws/counter')

        time.sleep(SLEEPING_TIME)
        response = yield ws.read_message()
        self.assertDictEqual(json_decode(response), {
            'event': 'counter_connection',
            'data': {
                'message': 'Got new connection.',
                'counter_value': 0  # Initial value of counter
            }
        })

        yield ws.write_message(json_encode({
            'event': 'setup',
            'data': {
                'counter_value': 'not_an_integer'
            }
        }))
        time.sleep(SLEEPING_TIME)

        response = yield ws.read_message()
        self.assertDictEqual(json_decode(response), {
            'event': 'counter_error',
            'data': {
                'message': 'Setup initial counter value: FAIL.',
                'details': '"value" is not an integer.'
            }
        })

        self.close(ws)
开发者ID:onysos,项目名称:django-tornado-websockets,代码行数:31,代码来源:test_websocket.py


示例11: test_get_kernels

    def test_get_kernels(self):
        '''Server should respond with running kernel information.'''
        self.app.web_app.settings['kg_list_kernels'] = True
        response = yield self.http_client.fetch(
            self.get_url('/api/kernels')
        )
        self.assertEqual(response.code, 200)
        kernels = json_decode(response.body)
        self.assertEqual(len(kernels), 0)

        # Launch a kernel
        response = yield self.http_client.fetch(
            self.get_url('/api/kernels'),
            method='POST',
            body='{}'
        )
        self.assertEqual(response.code, 201)
        kernel = json_decode(response.body)

        # Check the list again
        response = yield self.http_client.fetch(
            self.get_url('/api/kernels')
        )
        self.assertEqual(response.code, 200)
        kernels = json_decode(response.body)
        self.assertEqual(len(kernels), 1)
        self.assertEqual(kernels[0]['id'], kernel['id'])
开发者ID:bpburns,项目名称:kernel_gateway,代码行数:27,代码来源:test_gatewayapp.py


示例12: post

 def post(self):
     """Add a facility."""
     global compressed_facilities
     if not revisit_online:
         raise tornado.web.HTTPError(502)
     new_facility = json_decode(self.request.body)
     c_facilities_json = json_decode(compressed_facilities)
     facility_data = (
         c_facilities_json['facilities']['children']['wn']['data'][0]
     )
     uncompressed = json_decode(lzs.decompressFromUTF16(facility_data))
     uncompressed.append({
         '_version': 0,
         'active': True,
         'coordinates': new_facility['coordinates'],
         'createdAt': '2014-04-23T20:32:20.043Z',
         'href': (
             'http://localhost:3000/api/v0/facilities/{}.json'.format(
                 new_facility['uuid']
             )
         ),
         'identifiers': [],
         'name': new_facility['name'],
         'properties': new_facility['properties'],
         'updatedAt': '2014-04-23T20:32:20.043Z',
         'uuid': new_facility['uuid'],
     })
     compressed = lzs.compressToUTF16(json_encode(uncompressed))
     c_facilities_json['facilities']['children']['wn']['data'] = [
         compressed
     ]
     compressed_facilities = json_encode(c_facilities_json).encode()
     self.set_status(201)
开发者ID:SEL-Columbia,项目名称:dokomoforms,代码行数:33,代码来源:debug.py


示例13: open

    def open(self, document_id):
        print('Websocket opened')
        current_user = self.get_current_user()
        self.user_info = SessionUserInfo()
        doc_db, can_access = self.user_info.init_access(
            document_id, current_user)

        if can_access:
            if doc_db.id in DocumentWS.sessions:
                self.doc = DocumentWS.sessions[doc_db.id]
                self.id = max(self.doc['participants']) + 1
                print("id when opened %s" % self.id)
            else:
                self.id = 0
                self.doc = dict()
                self.doc['db'] = doc_db
                self.doc['participants'] = dict()
                self.doc['last_diffs'] = json_decode(doc_db.last_diffs)
                self.doc['comments'] = json_decode(doc_db.comments)
                self.doc['settings'] = json_decode(doc_db.settings)
                self.doc['contents'] = json_decode(doc_db.contents)
                self.doc['metadata'] = json_decode(doc_db.metadata)
                self.doc['version'] = doc_db.version
                self.doc['diff_version'] = doc_db.diff_version
                self.doc['comment_version'] = doc_db.comment_version
                self.doc['title'] = doc_db.title
                self.doc['id'] = doc_db.id
                DocumentWS.sessions[doc_db.id] = self.doc
            self.doc['participants'][self.id] = self
            response = dict()
            response['type'] = 'welcome'
            self.write_message(response)
开发者ID:OSCOSS,项目名称:fiduswriter,代码行数:32,代码来源:ws_views.py


示例14: test_login_failed_empty

    def test_login_failed_empty(self):
        post_data = {'action': 'Login',
                     'email': '',
                     'password': 'bar'}
        body = urlencode(post_data)
        self.http_client.fetch(self.get_url('/login/email'),
                               self.stop,
                               method='POST',
                               body=body,
                               follow_redirects=False)
        response = self.wait()
        self.assertEqual(json_decode(response.body)['status'], 'failed')
        self.assertEqual(json_decode(response.body)['error'], 'Email and password are mandatory')

        post_data = {'action': 'Login',
                     'email': 'foo',
                     'password': ''}
        body = urlencode(post_data)
        self.http_client.fetch(self.get_url('/login/email'),
                               self.stop,
                               method='POST',
                               body=body,
                               follow_redirects=False)
        response = self.wait()
        self.assertEqual(json_decode(response.body)['status'], 'failed')
        self.assertEqual(json_decode(response.body)['error'], 'Email and password are mandatory')
开发者ID:fgaudin,项目名称:libre,代码行数:26,代码来源:auth.py


示例15: post

    def post(self):
        data = json_decode(self.request.body)
        logger.debug('Sonarr download: %s', data)
        event_type = data['EventType']
        if event_type in ['Test', 'Rename']:
            return

        http_client = AsyncHTTPClient()
        for episode in data['Episodes']:
            id = episode['Id']
            headers = {'X-Api-Key':env.settings.sonarr_api_key}

            request = HTTPRequest(
                method='GET', headers=headers,
                url='%s/api/Episode/%d' % (env.settings.sonarr_url, id))
            response = yield http_client.fetch(request)
            episode_data = json_decode(response.body)
            logger.debug('Sonarr episode data: %s', episode_data)

            file_id = episode_data['episodeFileId']
            request = HTTPRequest(
                method='GET', headers=headers,
                url='%s/api/EpisodeFile/%d' % (env.settings.sonarr_url, file_id))
            response = yield http_client.fetch(request)
            file_data = json_decode(response.body)
            logger.debug('Sonarr file data: %s', file_data)

            path = file_data['path']
            name = file_data['sceneName']+os.path.splitext(path)[1]
            logger.info("ADD (sonarr): %s -> %s", path, name)
            SuperliminalCore.add_video(path, name)
开发者ID:NigelRook,项目名称:superliminal,代码行数:31,代码来源:api.py


示例16: test_json_errors

    def test_json_errors(self):
        """Handlers should always return JSON errors."""
        # A handler that we override
        response = yield self.http_client.fetch(
            self.get_url('/api/kernels'),
            raise_error=False
        )
        body = json_decode(response.body)
        self.assertEqual(response.code, 403)
        self.assertEqual(body['reason'], 'Forbidden')

        # A handler from the notebook base
        response = yield self.http_client.fetch(
            self.get_url('/api/kernels/1-2-3-4-5'),
            raise_error=False
        )
        body = json_decode(response.body)
        self.assertEqual(response.code, 404)
        # Base handler json_errors decorator does not capture reason properly
        # self.assertEqual(body['reason'], 'Not Found')
        self.assertIn('1-2-3-4-5', body['message'])

        # The last resort not found handler
        response = yield self.http_client.fetch(
            self.get_url('/fake-endpoint'),
            raise_error=False
        )
        body = json_decode(response.body)
        self.assertEqual(response.code, 404)
        self.assertEqual(body['reason'], 'Not Found')
开发者ID:Lomascolo,项目名称:kernel_gateway,代码行数:30,代码来源:test_jupyter_websocket.py


示例17: test_post_valid

    def test_post_valid(self):
        dontcare, uploads_dir = get_mountpoint('uploads')[0]
        foo_fp = os.path.join(uploads_dir, '1', 'foo.txt')
        bar_fp = os.path.join(uploads_dir, '1', 'bar.txt')
        with open(foo_fp, 'w') as fp:
            fp.write("@x\nATGC\n+\nHHHH\n")
        with open(bar_fp, 'w') as fp:
            fp.write("@x\nATGC\n+\nHHHH\n")

        prep = StringIO(EXP_PREP_TEMPLATE.format(1))
        prep_table = load_template_to_dataframe(prep)

        response = self.post('/api/v1/study/1/preparation?data_type=16S',
                             data=prep_table.T.to_dict(),
                             headers=self.headers, asjson=True)
        prepid = json_decode(response.body)['id']

        uri = '/api/v1/study/1/preparation/%d/artifact' % prepid
        # 1 -> fwd or rev sequences in fastq
        # 3 -> barcodes
        body = {'artifact_type': 'FASTQ', 'filepaths': [['foo.txt', 1],
                                                        ['bar.txt',
                                                         'raw_barcodes']],
                'artifact_name': 'a name is a name'}

        response = self.post(uri, data=body, headers=self.headers, asjson=True)
        self.assertEqual(response.code, 201)
        obs = json_decode(response.body)['id']

        prep_instance = PrepTemplate(prepid)
        exp = prep_instance.artifact.id
        self.assertEqual(obs, exp)
开发者ID:antgonza,项目名称:qiita,代码行数:32,代码来源:test_study_preparation.py


示例18: is_login_valid

 def is_login_valid(self, form_data):
     print(form_data)
     http_client = httpclient.HTTPClient()
     login_url = "http://localhost:8007/api/v1/user/login"
     response_body = None
     code = 0
     data = {
         'payload': form_data
     }
     try:
         response = http_client.fetch(httpclient.HTTPRequest(
                 url=login_url, method='POST', body=json_encode(data)))
         code = response.code
         response_body = json_decode(response.body)
     except httpclient.HTTPError as e:
         # HTTPError is raised for non-200 responses; the response
         # can be found in e.response.
         logger.debug("Error: %s" % str(e))
         code = e.response.code
         response_body = json_decode(e.response.body)
     except Exception as e:
         # Other errors are possible, such as IOError.
         logger.error("Error: %s" % str(e))
     http_client.close()
     return {
         'status': code,
         'response': response_body
     }
开发者ID:altamiro,项目名称:podship-pod,代码行数:28,代码来源:services.py


示例19: get_authenticated_user

    def get_authenticated_user(self, redirect_uri, callback, scope=None, **args):
        """
        class RenrenHandler(tornado.web.RequestHandler, RenrenGraphMixin):
            @tornado.web.asynchronous
            @gen.engine
            def get(self):
                self.get_authenticated_user(
                    callback=(yield gen.Callback('key')),
                    redirect_uri=url)
                user = yield gen.Wait('key')
                if not user:
                    raise web.HTTPError(500, "Renren auth failed")
                # do something else
                self.finish()
        """

        code = self.get_argument('code', None)
        if not code:
            self.authorize_redirect(redirect_uri, scope=scope, **args)
            return
        self.get_access_token(
            code, callback=(yield gen.Callback('_RenrenGraphMixin.get_authenticated_user')),
            redirect_uri=redirect_uri)

        response = yield gen.Wait('_RenrenGraphMixin.get_authenticated_user')
        if not response:
            callback(None)
            return
        try:
            user = json_decode(response.body)
        except:
            logging.warning("Error response %s fetching %s", response.body,
                    response.request.url)
            callback(None)
            return
        if 'error' in user:
            logging.warning("Error response %s fetching %s", user['error_description'],
                    response.request.url)
            callback(None)
            return

        #{{{ get session key
        self.renren_request('renren_api/session_key', user['access_token'],
                            callback=(yield gen.Callback('_RenrenGraphMixin._session_key')))
        response = yield gen.Wait('_RenrenGraphMixin._session_key')
        if response.error and not response.body:
            logging.warning("Error response %s fetching %s", response.error,
                    response.request.url)
        elif response.error:
            logging.warning("Error response %s fetching %s: %s", response.error,
                    response.request.url, response.body)
        else:
            try:
                user['session'] = json_decode(response.body)
            except:
                pass
        #}}} #TODO delete when renren graph api released
        callback(user)
        return
开发者ID:321cyb,项目名称:startup-blog,代码行数:59,代码来源:renren.py


示例20: test_1_register

 def test_1_register(self):
     """first test function"""
     client = yield websocket_connect("ws://localhost:8080/websocket", self.io_loop)
     
     request_id = 0
     client.write_message(json_encode({
                                       "action":"register",
                                       "request_id": request_id,
                                       "args":{
                                               "username":"bert", 
                                               "password":"bert" }}))
     response = yield self.read_until_request_id(client, request_id)
     print('got: {}'.format(response))
     response = json_decode(response)
     user = response["result"]
     
     
     request_id += 1
     client.write_message(json_encode({
                                       "action":"session_start",
                                       "request_id": request_id,
                                       "args":{
                                               "user_id":user["id"]}}))
     response = yield self.read_until_request_id(client, request_id)
     print('got: {}'.format(response))
     response = json_decode(response)
     session = response["result"]
     
     
     request_id += 1
     client.write_message(json_encode({
                                       "action":"draw_chat_object",
                                       "request_id": request_id,
                                       "args":{
                                               "user_id":user["id"],
                                               "session_id": session["id"],
                                               "type_": "Text",
                                               "color": "255:0:0:255",
                                               "point_x": 100,
                                               "point_y": 60,
                                               "value": "foo2"}}))
     response = yield self.read_until_request_id(client, request_id)
     print('got: {}'.format(response))
     
     
     request_id += 1
     client.write_message(json_encode({
                                       "action":"draw_chat_object",
                                       "request_id": request_id,
                                       "args":{
                                               "user_id":user["id"],
                                               "session_id": session["id"],
                                               "type_": "Text",
                                               "color": "0:0:255:255",
                                               "point_x": 100,
                                               "point_y": 110,
                                               "value": "bar2"}}))
     response = yield self.read_until_request_id(client, request_id)
     print('got: {}'.format(response))
开发者ID:OliverDashiell,项目名称:DrawChat,代码行数:59,代码来源:test_rpc.py



注:本文中的tornado.escape.json_decode函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python escape.json_encode函数代码示例发布时间:2022-05-27
下一篇:
Python escape._unicode函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap