• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Python utils.json_loads函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中redash.utils.json_loads函数的典型用法代码示例。如果您正苦于以下问题:Python json_loads函数的具体用法?Python json_loads怎么用?Python json_loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了json_loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: get_schema

    def get_schema(self, get_stats=False):
        query = """
        select release_version from system.local;
        """
        results, error = self.run_query(query, None)
        results = json_loads(results)
        release_version = results['rows'][0]['release_version']

        query = """
        SELECT table_name, column_name
        FROM system_schema.columns
        WHERE keyspace_name ='{}';
        """.format(self.configuration['keyspace'])

        if release_version.startswith('2'):
                query = """
                SELECT columnfamily_name AS table_name, column_name
                FROM system.schema_columns
                WHERE keyspace_name ='{}';
                """.format(self.configuration['keyspace'])

        results, error = self.run_query(query, None)
        results = json_loads(results)

        schema = {}
        for row in results['rows']:
            table_name = row['table_name']
            column_name = row['column_name']
            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}
            schema[table_name]['columns'].append(column_name)

        return schema.values()
开发者ID:ariarijp,项目名称:redash,代码行数:33,代码来源:cass.py


示例2: public_widget

def public_widget(widget):
    res = {
        'id': widget.id,
        'width': widget.width,
        'options': json_loads(widget.options),
        'text': widget.text,
        'updated_at': widget.updated_at,
        'created_at': widget.created_at
    }

    if widget.visualization and widget.visualization.id:
        query_data = models.QueryResult.query.get(widget.visualization.query_rel.latest_query_data_id).to_dict()
        res['visualization'] = {
            'type': widget.visualization.type,
            'name': widget.visualization.name,
            'description': widget.visualization.description,
            'options': json_loads(widget.visualization.options),
            'updated_at': widget.visualization.updated_at,
            'created_at': widget.visualization.created_at,
            'query': {
                'query': ' ',  # workaround, as otherwise the query data won't be loaded.
                'name': widget.visualization.query_rel.name,
                'description': widget.visualization.query_rel.description,
                'options': {},
                'latest_query_data': query_data
            }
        }

    return res
开发者ID:ariarijp,项目名称:redash,代码行数:29,代码来源:serializers.py


示例3: get_waiting_in_queue

def get_waiting_in_queue(queue_name):
    jobs = []
    for raw in redis_connection.lrange(queue_name, 0, -1):
        job = json_loads(raw)
        try:
            args = json_loads(job['headers']['argsrepr'])
            if args.get('query_id') == 'adhoc':
                args['query_id'] = None
        except ValueError:
            args = {}

        job_row = {
            'state': 'waiting_in_queue',
            'task_name': job['headers']['task'],
            'worker': None,
            'worker_pid': None,
            'start_time': None,
            'task_id': job['headers']['id'],
            'queue': job['properties']['delivery_info']['routing_key']
        }

        job_row.update(args)
        jobs.append(job_row)

    return jobs
开发者ID:ariarijp,项目名称:redash,代码行数:25,代码来源:monitor.py


示例4: process_formdata

 def process_formdata(self, valuelist):
     if valuelist:
         try:
             json_loads(valuelist[0])
         except ValueError:
             raise ValueError(self.gettext(u'Invalid JSON'))
         self.data = valuelist[0]
     else:
         self.data = ''
开发者ID:jonyboy2000,项目名称:redash,代码行数:9,代码来源:admin.py


示例5: run_query

    def run_query(self, query, user):
        logger.debug("BigQuery got query: %s", query)

        bigquery_service = self._get_bigquery_service()
        jobs = bigquery_service.jobs()

        try:
            if "totalMBytesProcessedLimit" in self.configuration:
                limitMB = self.configuration["totalMBytesProcessedLimit"]
                processedMB = self._get_total_bytes_processed(jobs, query) / 1000.0 / 1000.0
                if limitMB < processedMB:
                    return None, "Larger than %d MBytes will be processed (%f MBytes)" % (limitMB, processedMB)

            data = self._get_query_result(jobs, query)
            error = None

            json_data = json_dumps(data, ignore_nan=True)
        except apiclient.errors.HttpError as e:
            json_data = None
            if e.resp.status == 400:
                error = json_loads(e.content)['error']['message']
            else:
                error = e.content
        except KeyboardInterrupt:
            error = "Query cancelled by user."
            json_data = None

        return json_data, error
开发者ID:getredash,项目名称:redash,代码行数:28,代码来源:big_query.py


示例6: parse_tasks

def parse_tasks(task_lists, state):
    rows = []

    for task in itertools.chain(*task_lists.values()):
        task_row = {
            'state': state,
            'task_name': task['name'],
            'worker': task['hostname'],
            'queue': task['delivery_info']['routing_key'],
            'task_id': task['id'],
            'worker_pid': task['worker_pid'],
            'start_time': task['time_start'],
        }

        if task['name'] == 'redash.tasks.execute_query':
            try:
                args = json_loads(task['args'])
            except ValueError:
                args = {}

            if args.get('query_id') == 'adhoc':
                args['query_id'] = None

            task_row.update(args)

        rows.append(task_row)

    return rows
开发者ID:ariarijp,项目名称:redash,代码行数:28,代码来源:monitor.py


示例7: _get_tables

    def _get_tables(self, schema):
        query = """
        SELECT col.table_schema,
               col.table_name,
               col.column_name
        FROM `information_schema`.`columns` col
        WHERE col.table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
        """

        results, error = self.run_query(query, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        results = json_loads(results)

        for row in results['rows']:
            if row['table_schema'] != self.configuration['db']:
                table_name = u'{}.{}'.format(row['table_schema'], row['table_name'])
            else:
                table_name = row['table_name']

            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}

            schema[table_name]['columns'].append(row['column_name'])

        return schema.values()
开发者ID:jonyboy2000,项目名称:redash,代码行数:28,代码来源:mysql.py


示例8: get_schema

    def get_schema(self, get_stats=False):
        query = """
        SELECT TABLE_SCHEMA,
               TABLE_NAME,
               COLUMN_NAME
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA <> 'INFORMATION_SCHEMA'
        """

        results, error = self.run_query(query, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        schema = {}
        results = json_loads(results)

        for row in results['rows']:
            table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])

            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}

            schema[table_name]['columns'].append(row['COLUMN_NAME'])

        return schema.values()
开发者ID:ariarijp,项目名称:redash,代码行数:26,代码来源:druid.py


示例9: get_schema

    def get_schema(self, get_stats=False):
        query = """
        SELECT col.table_schema,
               col.table_name,
               col.column_name
        FROM {database}.information_schema.columns col
        WHERE col.table_schema <> 'INFORMATION_SCHEMA'
        """.format(database=self.configuration['database'])

        results, error = self.run_query(query, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        schema = {}
        results = json_loads(results)

        for row in results['rows']:
            table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])

            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}

            schema[table_name]['columns'].append(row['COLUMN_NAME'])

        return schema.values()
开发者ID:ariarijp,项目名称:redash,代码行数:26,代码来源:snowflake.py


示例10: run_query

    def run_query(self, query, user):
        jql_url = '{}/rest/api/2/search'.format(self.configuration["url"])

        try:
            query = json_loads(query)
            query_type = query.pop('queryType', 'select')
            field_mapping = FieldMapping(query.pop('fieldMapping', {}))

            if query_type == 'count':
                query['maxResults'] = 1
                query['fields'] = ''
            else:
                query['maxResults'] = query.get('maxResults', 1000)

            response, error = self.get_response(jql_url, params=query)
            if error is not None:
                return None, error

            data = response.json()

            if query_type == 'count':
                results = parse_count(data)
            else:
                results = parse_issues(data, field_mapping)

            return results.to_json(), None
        except KeyboardInterrupt:
            return None, "Query cancelled by user."
开发者ID:jonyboy2000,项目名称:redash,代码行数:28,代码来源:jql.py


示例11: _get_tables

    def _get_tables(self, schema):
        query = """
        SELECT table_schema, table_name, column_name
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE table_schema NOT IN ('guest','INFORMATION_SCHEMA','sys','db_owner','db_accessadmin'
                                  ,'db_securityadmin','db_ddladmin','db_backupoperator','db_datareader'
                                  ,'db_datawriter','db_denydatareader','db_denydatawriter'
                                  );
        """

        results, error = self.run_query(query, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        results = json_loads(results)

        for row in results['rows']:
            if row['table_schema'] != self.configuration['db']:
                table_name = u'{}.{}'.format(row['table_schema'], row['table_name'])
            else:
                table_name = row['table_name']

            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}

            schema[table_name]['columns'].append(row['column_name'])

        return schema.values()
开发者ID:ariarijp,项目名称:redash,代码行数:29,代码来源:mssql.py


示例12: test_get_dashboard

    def test_get_dashboard(self):
        d1 = self.factory.create_dashboard()
        rv = self.make_request('get', '/api/dashboards/{0}'.format(d1.slug))
        self.assertEquals(rv.status_code, 200)

        expected = serialize_dashboard(d1, with_widgets=True, with_favorite_state=False)
        actual = json_loads(rv.data)

        self.assertResponseEqual(expected, actual)
开发者ID:ariarijp,项目名称:redash,代码行数:9,代码来源:test_dashboards.py


示例13: to_dict

 def to_dict(self):
     return {
         'id': self.id,
         'query_hash': self.query_hash,
         'query': self.query_text,
         'data': json_loads(self.data),
         'data_source_id': self.data_source_id,
         'runtime': self.runtime,
         'retrieved_at': self.retrieved_at
     }
开发者ID:ariarijp,项目名称:redash,代码行数:10,代码来源:__init__.py


示例14: make_csv_content

    def make_csv_content(self):
        s = cStringIO.StringIO()

        query_data = json_loads(self.data)
        writer = csv.DictWriter(s, extrasaction="ignore", fieldnames=[col['name'] for col in query_data['columns']])
        writer.writer = utils.UnicodeWriter(s)
        writer.writeheader()
        for row in query_data['rows']:
            writer.writerow(row)

        return s.getvalue()
开发者ID:ariarijp,项目名称:redash,代码行数:11,代码来源:__init__.py


示例15: _load_result

def _load_result(query_id):
    from redash.authentication.org_resolving import current_org
    from redash import models

    query = models.Query.get_by_id_and_org(query_id, current_org)

    if query.data_source:
        query_result = models.QueryResult.get_by_id_and_org(query.latest_query_data_id, current_org)
        return json_loads(query_result.data)
    else:
        abort(400, message="This query is detached from any data source. Please select a different query.")
开发者ID:getredash,项目名称:redash,代码行数:11,代码来源:parameterized_query.py


示例16: get_query_results

def get_query_results(user, query_id, bring_from_cache):
    query = _load_query(user, query_id)
    if bring_from_cache:
        if query.latest_query_data_id is not None:
            results = query.latest_query_data.data
        else:
            raise Exception("No cached result available for query {}.".format(query.id))
    else:
        results, error = query.data_source.query_runner.run_query(query.query_text, user)
        if error:
            raise Exception("Failed loading results for query id {}.".format(query.id))

    return json_loads(results)
开发者ID:getredash,项目名称:redash,代码行数:13,代码来源:query_results.py


示例17: _get_bigquery_service

    def _get_bigquery_service(self):
        scope = [
            "https://www.googleapis.com/auth/bigquery",
            "https://www.googleapis.com/auth/drive"
        ]

        key = json_loads(b64decode(self.configuration['jsonKeyFile']))

        creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)
        http = httplib2.Http(timeout=settings.BIGQUERY_HTTP_TIMEOUT)
        http = creds.authorize(http)

        return build("bigquery", "v2", http=http)
开发者ID:getredash,项目名称:redash,代码行数:13,代码来源:big_query.py


示例18: _get_spreadsheet_service

    def _get_spreadsheet_service(self):
        scope = [
            'https://spreadsheets.google.com/feeds',
        ]

        key = json_loads(b64decode(self.configuration['jsonKeyFile']))
        creds = ServiceAccountCredentials.from_json_keyfile_dict(key, scope)

        timeout_session = HTTPSession()
        timeout_session.requests_session = TimeoutSession()
        spreadsheetservice = gspread.Client(auth=creds, http_session=timeout_session)
        spreadsheetservice.login()
        return spreadsheetservice
开发者ID:jonyboy2000,项目名称:redash,代码行数:13,代码来源:google_spreadsheets.py


示例19: _get_tables

    def _get_tables(self, schema):
        query_table = "select tbl_name from sqlite_master where type='table'"
        query_columns = "PRAGMA table_info(%s)"

        results, error = self.run_query(query_table, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        results = json_loads(results)

        for row in results['rows']:
            table_name = row['tbl_name']
            schema[table_name] = {'name': table_name, 'columns': []}
            results_table, error = self.run_query(query_columns % (table_name,), None)
            if error is not None:
                raise Exception("Failed getting schema.")

            results_table = json_loads(results_table)
            for row_column in results_table['rows']:
                schema[table_name]['columns'].append(row_column['name'])

        return schema.values()
开发者ID:ariarijp,项目名称:redash,代码行数:23,代码来源:sqlite.py


示例20: run_query

    def run_query(self, query, user):
        try:
            error = None

            logger.debug(query)
            query_dict = json_loads(query)

            index_name = query_dict.pop("index", "")
            result_fields = query_dict.pop("result_fields", None)

            if not self.server_url:
                error = "Missing configuration key 'server'"
                return None, error

            url = "{0}/{1}/_search".format(self.server_url, index_name)
            mapping_url = "{0}/{1}/_mapping".format(self.server_url, index_name)

            mappings, error = self._get_query_mappings(mapping_url)
            if error:
                return None, error

            logger.debug("Using URL: %s", url)
            logger.debug("Using query: %s", query_dict)
            r = requests.get(url, json=query_dict, auth=self.auth)
            r.raise_for_status()
            logger.debug("Result: %s", r.json())

            result_columns = []
            result_rows = []
            self._parse_results(mappings, result_fields, r.json(), result_columns, result_rows)

            json_data = json_dumps({
                "columns": result_columns,
                "rows": result_rows
            })
        except KeyboardInterrupt:
            logger.exception(e)
            error = "Query cancelled by user."
            json_data = None
        except requests.HTTPError as e:
            logger.exception(e)
            error = "Failed to execute query. Return Code: {0}   Reason: {1}".format(r.status_code, r.text)
            json_data = None
        except requests.exceptions.RequestException as e:
            logger.exception(e)
            error = "Connection refused"
            json_data = None

        return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:49,代码来源:elasticsearch.py



注:本文中的redash.utils.json_loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.utcnow函数代码示例发布时间:2022-05-26
下一篇:
Python utils.json_dumps函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap