• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.json_dumps函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中redash.utils.json_dumps函数的典型用法代码示例。如果您正苦于以下问题:Python json_dumps函数的具体用法?Python json_dumps怎么用?Python json_dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了json_dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: embed

def embed(query_id, visualization_id):

    query = models.Query.get_by_id(query_id)
    vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
    qr = {}
    if vis is not None:
        vis = vis.to_dict()
        qr = query.latest_query_data
        if qr is None:
            abort(400, message="No Results for this query")
        else:
            qr = qr.to_dict()
    else:
        abort(404, message="Visualization not found.")

    client_config = {}
    client_config.update(settings.COMMON_CLIENT_CONFIG)

    return render_template(
        "embed.html",
        name=settings.NAME,
        client_config=json_dumps(client_config),
        visualization=json_dumps(vis),
        query_result=json_dumps(qr),
        analytics=settings.ANALYTICS,
    )
开发者ID:johnkabler,项目名称:redash,代码行数:26,代码来源:embed.py


示例2: embed

def embed(query_id, visualization_id, org_slug=None):
    # TODO: add event for embed access
    query = models.Query.get_by_id_and_org(query_id, current_org)
    vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
    qr = {}

    if vis is not None:
        vis = vis.to_dict()
        qr = query.latest_query_data
        if qr is None:
            abort(400, message="No Results for this query")
        else:
            qr = qr.to_dict()
    else:
        abort(404, message="Visualization not found.")

    client_config = {}
    client_config.update(settings.COMMON_CLIENT_CONFIG)

    if settings.MULTI_ORG:
        base_href = url_for('index', _external=True, org_slug=current_org.slug)
    else:
        base_href = url_for('index', _external=True)

    return render_template("embed.html",
                           name=settings.NAME,
                           base_href=base_href,
                           client_config=json_dumps(client_config),
                           visualization=json_dumps(vis),
                           query_result=json_dumps(qr),
                           analytics=settings.ANALYTICS)
开发者ID:kazuhisam3,项目名称:redash,代码行数:31,代码来源:embed.py


示例3: embed

def embed(query_id, visualization_id, org_slug=None):
    # TODO: add event for embed access
    query = models.Query.get_by_id_and_org(query_id, current_org)
    require_access(query.groups, current_user, view_only)
    vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
    qr = {}

    if vis is not None:
        vis = vis.to_dict()
        qr = query.latest_query_data
        if qr is None:
            abort(400, message="No Results for this query")
        else:
            qr = qr.to_dict()
    else:
        abort(404, message="Visualization not found.")

    client_config = {}
    client_config.update(settings.COMMON_CLIENT_CONFIG)

    qr = project(qr, ('data', 'id', 'retrieved_at'))
    vis = project(vis, ('description', 'name', 'id', 'options', 'query', 'type', 'updated_at'))
    vis['query'] = project(vis, ('created_at', 'description', 'name', 'id', 'latest_query_data_id', 'name', 'updated_at'))

    return render_template("embed.html",
                           name=settings.NAME,
                           base_href=base_href(),
                           client_config=json_dumps(client_config),
                           visualization=json_dumps(vis),
                           query_result=json_dumps(qr),
                           analytics=settings.ANALYTICS)
开发者ID:snirad,项目名称:redash,代码行数:31,代码来源:embed.py


示例4: embed

def embed(query_id, visualization_id, org_slug=None):
    query = models.Query.get_by_id_and_org(query_id, current_org)
    require_access(query.groups, current_user, view_only)
    vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
    qr = {}

    parameter_values = collect_parameters_from_request(request.args)

    if vis is not None:
        vis = vis.to_dict()
        qr = query.latest_query_data
        logging.info("jonhere")
        logging.info( settings.ALLOW_PARAMETERS_IN_EMBEDS)
        if settings.ALLOW_PARAMETERS_IN_EMBEDS == True and len(parameter_values) > 0:
            #abort(404,message="jlk") 
            # run parameterized query
            #
            # WARNING: Note that the external query parameters
            #          are a potential risk of SQL injections.
            #
            results = run_query_sync(query.data_source, parameter_values, query.query)
            logging.info("jonhere2")
            logging.info("results")
            if results is None:
                abort(400, message="Unable to get results for this query")
            else:
                qr = {"data": json.loads(results)}
        elif qr is None:
            abort(400, message="No Results for this query")
        else:
            qr = qr.to_dict()
    else:
        abort(404, message="Visualization not found.")

    record_event(current_org, current_user, {
        'action': 'view',
        'object_id': visualization_id,
        'object_type': 'visualization',
        'query_id': query_id,
        'embed': True,
        'referer': request.headers.get('Referer')
    })

    client_config = {}
    client_config.update(settings.COMMON_CLIENT_CONFIG)

    qr = project(qr, ('data', 'id', 'retrieved_at'))
    vis = project(vis, ('description', 'name', 'id', 'options', 'query', 'type', 'updated_at'))
    vis['query'] = project(vis['query'], ('created_at', 'description', 'name', 'id', 'latest_query_data_id', 'name', 'updated_at'))

    return render_template("embed.html",
                           client_config=json_dumps(client_config),
                           visualization=json_dumps(vis),
                           query_result=json_dumps(qr))
开发者ID:solutionrooms,项目名称:testredash,代码行数:54,代码来源:embed.py


示例5: run_query

    def run_query(self, query, user):
        connection = phoenixdb.connect(
            url=self.configuration.get('url', ''),
            autocommit=True)

        cursor = connection.cursor()

        try:
            cursor.execute(query)
            column_tuples = [(i[0], TYPES_MAPPING.get(i[1], None)) for i in cursor.description]
            columns = self.fetch_columns(column_tuples)
            rows = [dict(zip(([c['name'] for c in columns]), r)) for i, r in enumerate(cursor.fetchall())]
            data = {'columns': columns, 'rows': rows}
            json_data = json_dumps(data)
            error = None
            cursor.close()
        except Error as e:
            json_data = None
            error = 'code: {}, sql state:{}, message: {}'.format(e.code, e.sqlstate, e.message)
        except (KeyboardInterrupt, InterruptException) as e:
            error = "Query cancelled by user."
            json_data = None
        except Exception as ex:
            json_data = None
            error = unicode(ex)
        finally:
            if connection:
                connection.close()

        return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:30,代码来源:phoenix.py


示例6: make_request

    def make_request(self, method, path, org=None, user=None, data=None,
                     is_json=True):
        if user is None:
            user = self.factory.user

        if org is None:
            org = self.factory.org

        if org is not False:
            path = "/{}{}".format(org.slug, path)

        if user:
            authenticate_request(self.client, user)

        method_fn = getattr(self.client, method.lower())
        headers = {}

        if data and is_json:
            data = json_dumps(data)

        if is_json:
            content_type = 'application/json'
        else:
            content_type = None

        response = method_fn(path, data=data, headers=headers, content_type=content_type)

        if response.data and is_json:
            response.json = json.loads(response.data)

        return response
开发者ID:ezioavi42,项目名称:redash,代码行数:31,代码来源:__init__.py


示例7: run_query

    def run_query(self, query, user):
        connection = sqlite3.connect(self._dbpath)

        cursor = connection.cursor()

        try:
            cursor.execute(query)

            if cursor.description is not None:
                columns = self.fetch_columns([(i[0], None) for i in cursor.description])
                rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]

                data = {'columns': columns, 'rows': rows}
                error = None
                json_data = json_dumps(data)
            else:
                error = 'Query completed but it returned no data.'
                json_data = None
        except KeyboardInterrupt:
            connection.cancel()
            error = "Query cancelled by user."
            json_data = None
        except Exception as e:
            # handle unicode error message
            err_class = sys.exc_info()[1].__class__
            err_args = [arg.decode('utf-8') for arg in sys.exc_info()[1].args]
            unicode_err = err_class(*err_args)
            reraise(unicode_err, None, sys.exc_info()[2])
        finally:
            connection.close()
        return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:31,代码来源:sqlite.py


示例8: notify

    def notify(self, alert, query, user, new_state, app, host, options):
        # Documentation: https://api.slack.com/docs/attachments
        fields = [
            {
                "title": "Query",
                "value": "{host}/queries/{query_id}".format(host=host, query_id=query.id),
                "short": True
            },
            {
                "title": "Alert",
                "value": "{host}/alerts/{alert_id}".format(host=host, alert_id=alert.id),
                "short": True
            }
        ]
        if new_state == "triggered":
            text = alert.name + " just triggered"
            color = "#c0392b"
        else:
            text = alert.name + " went back to normal"
            color = "#27ae60"
        
        payload = {'attachments': [{'text': text, 'color': color, 'fields': fields}]}

        if options.get('username'): payload['username'] = options.get('username')
        if options.get('icon_emoji'): payload['icon_emoji'] = options.get('icon_emoji')
        if options.get('icon_url'): payload['icon_url'] = options.get('icon_url')
        if options.get('channel'): payload['channel'] = options.get('channel')

        try:
            resp = requests.post(options.get('url'), data=json_dumps(payload), timeout=5.0)
            logging.warning(resp.text)
            if resp.status_code != 200:
                logging.error("Slack send ERROR. status_code => {status}".format(status=resp.status_code))
        except Exception:
            logging.exception("Slack send ERROR.")
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:slack.py


示例9: post

    def post(self):
        """
        Add a widget to a dashboard.

        :<json number dashboard_id: The ID for the dashboard being added to
        :<json visualization_id: The ID of the visualization to put in this widget
        :<json object options: Widget options
        :<json string text: Text box contents
        :<json number width: Width for widget display

        :>json object widget: The created widget
        """
        widget_properties = request.get_json(force=True)
        dashboard = models.Dashboard.get_by_id_and_org(widget_properties.pop('dashboard_id'), self.current_org)
        require_object_modify_permission(dashboard, self.current_user)

        widget_properties['options'] = json_dumps(widget_properties['options'])
        widget_properties.pop('id', None)
        widget_properties['dashboard'] = dashboard

        visualization_id = widget_properties.pop('visualization_id')
        if visualization_id:
            visualization = models.Visualization.get_by_id_and_org(visualization_id, self.current_org)
            require_access(visualization.query_rel, self.current_user, view_only)
        else:
            visualization = None

        widget_properties['visualization'] = visualization

        widget = models.Widget(**widget_properties)
        models.db.session.add(widget)
        models.db.session.commit()

        models.db.session.commit()
        return serialize_widget(widget)
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:widgets.py


示例10: run_query

    def run_query(self, query, user):
        connection = atsd_client.connect_url(self.url,
                                             self.configuration.get('username'),
                                             self.configuration.get('password'),
                                             verify=self.configuration.get('trust_certificate', False),
                                             timeout=self.configuration.get('timeout', 600))
        sql = SQLService(connection)
        query_id = str(uuid.uuid4())

        try:
            logger.debug("SQL running query: %s", query)
            data = sql.query_with_params(query, {'outputFormat': 'csv', 'metadataFormat': 'EMBED',
                                                 'queryId': query_id})

            columns, rows = generate_rows_and_columns(data)

            data = {'columns': columns, 'rows': rows}
            json_data = json_dumps(data)
            error = None

        except SQLException as e:
            json_data = None
            error = e.content
        except (KeyboardInterrupt, InterruptException):
            sql.cancel_query(query_id)
            error = "Query cancelled by user."
            json_data = None

        return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:29,代码来源:axibase_tsd.py


示例11: public_dashboard

def public_dashboard(token, org_slug=None):
    # TODO: verify object is a dashboard?
    if not isinstance(current_user, models.ApiUser):
        api_key = models.ApiKey.get_by_api_key(token)
        dashboard = api_key.object
    else:
        dashboard = current_user.object

    user = {
        'permissions': [],
        'apiKey': current_user.id
    }

    headers = {
        'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate'
    }

    record_event(current_org, current_user, {
        'action': 'view',
        'object_id': dashboard.id,
        'object_type': 'dashboard',
        'public': True,
        'headless': 'embed' in request.args,
        'referer': request.headers.get('Referer')
    })

    response = render_template("public.html",
                               headless='embed' in request.args,
                               user=json.dumps(user),
                               seed_data=json_dumps({
                                 'dashboard': serializers.public_dashboard(dashboard)
                               }),
                               client_config=json.dumps(settings.COMMON_CLIENT_CONFIG))

    return response, 200, headers
开发者ID:alexsukstansky,项目名称:redash,代码行数:35,代码来源:embed.py


示例12: run_query

    def run_query(self, query, user):
        connection = snowflake.connector.connect(
            user=self.configuration['user'],
            password=self.configuration['password'],
            account=self.configuration['account'],
        )

        cursor = connection.cursor()

        try:
            cursor.execute("USE WAREHOUSE {}".format(self.configuration['warehouse']))
            cursor.execute("USE {}".format(self.configuration['database']))

            cursor.execute(query)

            columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
            rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]

            data = {'columns': columns, 'rows': rows}
            error = None
            json_data = json_dumps(data)
        finally:
            cursor.close()
            connection.close()

        return json_data, error
开发者ID:13768324554,项目名称:redash,代码行数:26,代码来源:snowflake.py


示例13: json_representation

def json_representation(data, code, headers=None):
    # Flask-Restful checks only for flask.Response but flask-login uses werkzeug.wrappers.Response
    if isinstance(data, Response):
        return data
    resp = make_response(json_dumps(data), code)
    resp.headers.extend(headers or {})
    return resp
开发者ID:Captricity,项目名称:redash,代码行数:7,代码来源:api.py


示例14: run_query

    def run_query(self, query, user):
        connection = self._get_connection()
        _wait(connection, timeout=10)

        cursor = connection.cursor()

        try:
            cursor.execute(query)
            _wait(connection)

            if cursor.description is not None:
                columns = self.fetch_columns([(i[0], types_map.get(i[1], None)) for i in cursor.description])
                rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]

                data = {'columns': columns, 'rows': rows}
                error = None
                json_data = json_dumps(data, ignore_nan=True)
            else:
                error = 'Query completed but it returned no data.'
                json_data = None
        except (select.error, OSError) as e:
            error = "Query interrupted. Please retry."
            json_data = None
        except psycopg2.DatabaseError as e:
            error = e.message
            json_data = None
        except (KeyboardInterrupt, InterruptException):
            connection.cancel()
            error = "Query cancelled by user."
            json_data = None
        finally:
            connection.close()

        return json_data, error
开发者ID:rohithreddy,项目名称:redash,代码行数:34,代码来源:pg.py


示例15: run_query

    def run_query(self, query, user):
        connection = None
        try:
            if self.configuration.get('username', '') and self.configuration.get('password', ''):
                auth_provider = PlainTextAuthProvider(username='{}'.format(self.configuration.get('username', '')),
                                                      password='{}'.format(self.configuration.get('password', '')))
                connection = Cluster([self.configuration.get('host', '')],
                                     auth_provider=auth_provider,
                                     port=self.configuration.get('port', ''),
                                     protocol_version=self.configuration.get('protocol', 3))
            else:
                connection = Cluster([self.configuration.get('host', '')],
                                     port=self.configuration.get('port', ''),
                                     protocol_version=self.configuration.get('protocol', 3))
            session = connection.connect()
            session.set_keyspace(self.configuration['keyspace'])
            session.default_timeout = self.configuration.get('timeout', 10)
            logger.debug("Cassandra running query: %s", query)
            result = session.execute(query)

            column_names = result.column_names

            columns = self.fetch_columns(map(lambda c: (c, 'string'), column_names))

            rows = [dict(zip(column_names, row)) for row in result]

            data = {'columns': columns, 'rows': rows}
            json_data = json_dumps(data, cls=CassandraJSONEncoder)

            error = None
        except KeyboardInterrupt:
            error = "Query cancelled by user."
            json_data = None

        return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:cass.py


示例16: run_query

    def run_query(self, query, user):
        logger.debug("BigQuery got query: %s", query)

        bigquery_service = self._get_bigquery_service()
        jobs = bigquery_service.jobs()

        try:
            if "totalMBytesProcessedLimit" in self.configuration:
                limitMB = self.configuration["totalMBytesProcessedLimit"]
                processedMB = self._get_total_bytes_processed(jobs, query) / 1000.0 / 1000.0
                if limitMB < processedMB:
                    return None, "Larger than %d MBytes will be processed (%f MBytes)" % (limitMB, processedMB)

            data = self._get_query_result(jobs, query)
            error = None

            json_data = json_dumps(data, ignore_nan=True)
        except apiclient.errors.HttpError as e:
            json_data = None
            if e.resp.status == 400:
                error = json_loads(e.content)['error']['message']
            else:
                error = e.content
        except KeyboardInterrupt:
            error = "Query cancelled by user."
            json_data = None

        return json_data, error
开发者ID:getredash,项目名称:redash,代码行数:28,代码来源:big_query.py


示例17: public_dashboard

def public_dashboard(token, org_slug=None):
    # TODO: verify object is a dashboard?
    if not isinstance(current_user, models.ApiUser):
        api_key = models.ApiKey.get_by_api_key(token)
        dashboard = api_key.object
    else:
        dashboard = current_user.object

    user = {
        'permissions': [],
        'apiKey': current_user.id
    }

    headers = {
        'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate'
    }

    response = render_template("public.html",
                               headless='embed' in request.args,
                               user=json.dumps(user),
                               seed_data=json_dumps({
                                 'dashboard': serializers.public_dashboard(dashboard)
                               }),
                               base_href=base_href(),
                               name=settings.NAME,
                               client_config=json.dumps(settings.COMMON_CLIENT_CONFIG),
                               analytics=settings.ANALYTICS)

    return response, 200, headers
开发者ID:snirad,项目名称:redash,代码行数:29,代码来源:embed.py


示例18: run_query

    def run_query(self, query, user):
        logger.debug("Metrica is about to execute query: %s", query)
        data = None
        query = query.strip()
        if query == "":
            error = "Query is empty"
            return data, error
        try:
            params = yaml.safe_load(query)
        except ValueError as e:
            logging.exception(e)
            error = unicode(e)
            return data, error

        if isinstance(params, dict):
            if 'url' in params:
                params = parse_qs(urlparse(params['url']).query, keep_blank_values=True)
        else:
            error = 'The query format must be JSON or YAML'
            return data, error

        try:
            data = json_dumps(parse_ym_response(self._send_query(**params)))
            error = None
        except Exception as e:
            logging.exception(e)
            error = unicode(e)
        return data, error
开发者ID:jonyboy2000,项目名称:redash,代码行数:28,代码来源:yandex_metrica.py


示例19: run_query

    def run_query(self, query, user):
        url = self.configuration['url']
        kylinuser = self.configuration['user']
        kylinpass = self.configuration['password']
        kylinproject = self.configuration['project']

        resp = requests.post(
            os.path.join(url, "api/query"),
            auth=HTTPBasicAuth(kylinuser, kylinpass),
            json={
                "sql": query,
                "offset": settings.KYLIN_OFFSET,
                "limit": settings.KYLIN_LIMIT,
                "acceptPartial": settings.KYLIN_ACCEPT_PARTIAL,
                "project": kylinproject
            }
        )

        if not resp.ok:
            return {}, resp.text or str(resp.reason)

        data = resp.json()
        columns = self.get_columns(data['columnMetas'])
        rows = self.get_rows(columns, data['results'])

        return json_dumps({'columns': columns, 'rows': rows}), None
开发者ID:ariarijp,项目名称:redash,代码行数:26,代码来源:kylin.py


示例20: test_supports_relative_timestamps

    def test_supports_relative_timestamps(self):
        query = {
            'ts': {'$humanTime': '1 hour ago'}
        }

        one_hour_ago = parse_human_time("1 hour ago")
        query_data = parse_query_json(json_dumps(query))
        self.assertEqual(query_data['ts'], one_hour_ago)
开发者ID:ariarijp,项目名称:redash,代码行数:8,代码来源:test_mongodb.py



注:本文中的redash.utils.json_dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.json_loads函数代码示例发布时间:2022-05-26
下一篇:
Python utils.gen_query_hash函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap