本文整理汇总了Python中redash.utils.json_dumps函数的典型用法代码示例。如果您正苦于以下问题:Python json_dumps函数的具体用法?Python json_dumps怎么用?Python json_dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了json_dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: embed
def embed(query_id, visualization_id):
query = models.Query.get_by_id(query_id)
vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
qr = {}
if vis is not None:
vis = vis.to_dict()
qr = query.latest_query_data
if qr is None:
abort(400, message="No Results for this query")
else:
qr = qr.to_dict()
else:
abort(404, message="Visualization not found.")
client_config = {}
client_config.update(settings.COMMON_CLIENT_CONFIG)
return render_template(
"embed.html",
name=settings.NAME,
client_config=json_dumps(client_config),
visualization=json_dumps(vis),
query_result=json_dumps(qr),
analytics=settings.ANALYTICS,
)
开发者ID:johnkabler,项目名称:redash,代码行数:26,代码来源:embed.py
示例2: embed
def embed(query_id, visualization_id, org_slug=None):
# TODO: add event for embed access
query = models.Query.get_by_id_and_org(query_id, current_org)
vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
qr = {}
if vis is not None:
vis = vis.to_dict()
qr = query.latest_query_data
if qr is None:
abort(400, message="No Results for this query")
else:
qr = qr.to_dict()
else:
abort(404, message="Visualization not found.")
client_config = {}
client_config.update(settings.COMMON_CLIENT_CONFIG)
if settings.MULTI_ORG:
base_href = url_for('index', _external=True, org_slug=current_org.slug)
else:
base_href = url_for('index', _external=True)
return render_template("embed.html",
name=settings.NAME,
base_href=base_href,
client_config=json_dumps(client_config),
visualization=json_dumps(vis),
query_result=json_dumps(qr),
analytics=settings.ANALYTICS)
开发者ID:kazuhisam3,项目名称:redash,代码行数:31,代码来源:embed.py
示例3: embed
def embed(query_id, visualization_id, org_slug=None):
# TODO: add event for embed access
query = models.Query.get_by_id_and_org(query_id, current_org)
require_access(query.groups, current_user, view_only)
vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
qr = {}
if vis is not None:
vis = vis.to_dict()
qr = query.latest_query_data
if qr is None:
abort(400, message="No Results for this query")
else:
qr = qr.to_dict()
else:
abort(404, message="Visualization not found.")
client_config = {}
client_config.update(settings.COMMON_CLIENT_CONFIG)
qr = project(qr, ('data', 'id', 'retrieved_at'))
vis = project(vis, ('description', 'name', 'id', 'options', 'query', 'type', 'updated_at'))
vis['query'] = project(vis, ('created_at', 'description', 'name', 'id', 'latest_query_data_id', 'name', 'updated_at'))
return render_template("embed.html",
name=settings.NAME,
base_href=base_href(),
client_config=json_dumps(client_config),
visualization=json_dumps(vis),
query_result=json_dumps(qr),
analytics=settings.ANALYTICS)
开发者ID:snirad,项目名称:redash,代码行数:31,代码来源:embed.py
示例4: embed
def embed(query_id, visualization_id, org_slug=None):
query = models.Query.get_by_id_and_org(query_id, current_org)
require_access(query.groups, current_user, view_only)
vis = query.visualizations.where(models.Visualization.id == visualization_id).first()
qr = {}
parameter_values = collect_parameters_from_request(request.args)
if vis is not None:
vis = vis.to_dict()
qr = query.latest_query_data
logging.info("jonhere")
logging.info( settings.ALLOW_PARAMETERS_IN_EMBEDS)
if settings.ALLOW_PARAMETERS_IN_EMBEDS == True and len(parameter_values) > 0:
#abort(404,message="jlk")
# run parameterized query
#
# WARNING: Note that the external query parameters
# are a potential risk of SQL injections.
#
results = run_query_sync(query.data_source, parameter_values, query.query)
logging.info("jonhere2")
logging.info("results")
if results is None:
abort(400, message="Unable to get results for this query")
else:
qr = {"data": json.loads(results)}
elif qr is None:
abort(400, message="No Results for this query")
else:
qr = qr.to_dict()
else:
abort(404, message="Visualization not found.")
record_event(current_org, current_user, {
'action': 'view',
'object_id': visualization_id,
'object_type': 'visualization',
'query_id': query_id,
'embed': True,
'referer': request.headers.get('Referer')
})
client_config = {}
client_config.update(settings.COMMON_CLIENT_CONFIG)
qr = project(qr, ('data', 'id', 'retrieved_at'))
vis = project(vis, ('description', 'name', 'id', 'options', 'query', 'type', 'updated_at'))
vis['query'] = project(vis['query'], ('created_at', 'description', 'name', 'id', 'latest_query_data_id', 'name', 'updated_at'))
return render_template("embed.html",
client_config=json_dumps(client_config),
visualization=json_dumps(vis),
query_result=json_dumps(qr))
开发者ID:solutionrooms,项目名称:testredash,代码行数:54,代码来源:embed.py
示例5: run_query
def run_query(self, query, user):
connection = phoenixdb.connect(
url=self.configuration.get('url', ''),
autocommit=True)
cursor = connection.cursor()
try:
cursor.execute(query)
column_tuples = [(i[0], TYPES_MAPPING.get(i[1], None)) for i in cursor.description]
columns = self.fetch_columns(column_tuples)
rows = [dict(zip(([c['name'] for c in columns]), r)) for i, r in enumerate(cursor.fetchall())]
data = {'columns': columns, 'rows': rows}
json_data = json_dumps(data)
error = None
cursor.close()
except Error as e:
json_data = None
error = 'code: {}, sql state:{}, message: {}'.format(e.code, e.sqlstate, e.message)
except (KeyboardInterrupt, InterruptException) as e:
error = "Query cancelled by user."
json_data = None
except Exception as ex:
json_data = None
error = unicode(ex)
finally:
if connection:
connection.close()
return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:30,代码来源:phoenix.py
示例6: make_request
def make_request(self, method, path, org=None, user=None, data=None,
is_json=True):
if user is None:
user = self.factory.user
if org is None:
org = self.factory.org
if org is not False:
path = "/{}{}".format(org.slug, path)
if user:
authenticate_request(self.client, user)
method_fn = getattr(self.client, method.lower())
headers = {}
if data and is_json:
data = json_dumps(data)
if is_json:
content_type = 'application/json'
else:
content_type = None
response = method_fn(path, data=data, headers=headers, content_type=content_type)
if response.data and is_json:
response.json = json.loads(response.data)
return response
开发者ID:ezioavi42,项目名称:redash,代码行数:31,代码来源:__init__.py
示例7: run_query
def run_query(self, query, user):
connection = sqlite3.connect(self._dbpath)
cursor = connection.cursor()
try:
cursor.execute(query)
if cursor.description is not None:
columns = self.fetch_columns([(i[0], None) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data)
else:
error = 'Query completed but it returned no data.'
json_data = None
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
# handle unicode error message
err_class = sys.exc_info()[1].__class__
err_args = [arg.decode('utf-8') for arg in sys.exc_info()[1].args]
unicode_err = err_class(*err_args)
reraise(unicode_err, None, sys.exc_info()[2])
finally:
connection.close()
return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:31,代码来源:sqlite.py
示例8: notify
def notify(self, alert, query, user, new_state, app, host, options):
# Documentation: https://api.slack.com/docs/attachments
fields = [
{
"title": "Query",
"value": "{host}/queries/{query_id}".format(host=host, query_id=query.id),
"short": True
},
{
"title": "Alert",
"value": "{host}/alerts/{alert_id}".format(host=host, alert_id=alert.id),
"short": True
}
]
if new_state == "triggered":
text = alert.name + " just triggered"
color = "#c0392b"
else:
text = alert.name + " went back to normal"
color = "#27ae60"
payload = {'attachments': [{'text': text, 'color': color, 'fields': fields}]}
if options.get('username'): payload['username'] = options.get('username')
if options.get('icon_emoji'): payload['icon_emoji'] = options.get('icon_emoji')
if options.get('icon_url'): payload['icon_url'] = options.get('icon_url')
if options.get('channel'): payload['channel'] = options.get('channel')
try:
resp = requests.post(options.get('url'), data=json_dumps(payload), timeout=5.0)
logging.warning(resp.text)
if resp.status_code != 200:
logging.error("Slack send ERROR. status_code => {status}".format(status=resp.status_code))
except Exception:
logging.exception("Slack send ERROR.")
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:slack.py
示例9: post
def post(self):
"""
Add a widget to a dashboard.
:<json number dashboard_id: The ID for the dashboard being added to
:<json visualization_id: The ID of the visualization to put in this widget
:<json object options: Widget options
:<json string text: Text box contents
:<json number width: Width for widget display
:>json object widget: The created widget
"""
widget_properties = request.get_json(force=True)
dashboard = models.Dashboard.get_by_id_and_org(widget_properties.pop('dashboard_id'), self.current_org)
require_object_modify_permission(dashboard, self.current_user)
widget_properties['options'] = json_dumps(widget_properties['options'])
widget_properties.pop('id', None)
widget_properties['dashboard'] = dashboard
visualization_id = widget_properties.pop('visualization_id')
if visualization_id:
visualization = models.Visualization.get_by_id_and_org(visualization_id, self.current_org)
require_access(visualization.query_rel, self.current_user, view_only)
else:
visualization = None
widget_properties['visualization'] = visualization
widget = models.Widget(**widget_properties)
models.db.session.add(widget)
models.db.session.commit()
models.db.session.commit()
return serialize_widget(widget)
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:widgets.py
示例10: run_query
def run_query(self, query, user):
connection = atsd_client.connect_url(self.url,
self.configuration.get('username'),
self.configuration.get('password'),
verify=self.configuration.get('trust_certificate', False),
timeout=self.configuration.get('timeout', 600))
sql = SQLService(connection)
query_id = str(uuid.uuid4())
try:
logger.debug("SQL running query: %s", query)
data = sql.query_with_params(query, {'outputFormat': 'csv', 'metadataFormat': 'EMBED',
'queryId': query_id})
columns, rows = generate_rows_and_columns(data)
data = {'columns': columns, 'rows': rows}
json_data = json_dumps(data)
error = None
except SQLException as e:
json_data = None
error = e.content
except (KeyboardInterrupt, InterruptException):
sql.cancel_query(query_id)
error = "Query cancelled by user."
json_data = None
return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:29,代码来源:axibase_tsd.py
示例11: public_dashboard
def public_dashboard(token, org_slug=None):
# TODO: verify object is a dashboard?
if not isinstance(current_user, models.ApiUser):
api_key = models.ApiKey.get_by_api_key(token)
dashboard = api_key.object
else:
dashboard = current_user.object
user = {
'permissions': [],
'apiKey': current_user.id
}
headers = {
'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate'
}
record_event(current_org, current_user, {
'action': 'view',
'object_id': dashboard.id,
'object_type': 'dashboard',
'public': True,
'headless': 'embed' in request.args,
'referer': request.headers.get('Referer')
})
response = render_template("public.html",
headless='embed' in request.args,
user=json.dumps(user),
seed_data=json_dumps({
'dashboard': serializers.public_dashboard(dashboard)
}),
client_config=json.dumps(settings.COMMON_CLIENT_CONFIG))
return response, 200, headers
开发者ID:alexsukstansky,项目名称:redash,代码行数:35,代码来源:embed.py
示例12: run_query
def run_query(self, query, user):
connection = snowflake.connector.connect(
user=self.configuration['user'],
password=self.configuration['password'],
account=self.configuration['account'],
)
cursor = connection.cursor()
try:
cursor.execute("USE WAREHOUSE {}".format(self.configuration['warehouse']))
cursor.execute("USE {}".format(self.configuration['database']))
cursor.execute(query)
columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data)
finally:
cursor.close()
connection.close()
return json_data, error
开发者ID:13768324554,项目名称:redash,代码行数:26,代码来源:snowflake.py
示例13: json_representation
def json_representation(data, code, headers=None):
# Flask-Restful checks only for flask.Response but flask-login uses werkzeug.wrappers.Response
if isinstance(data, Response):
return data
resp = make_response(json_dumps(data), code)
resp.headers.extend(headers or {})
return resp
开发者ID:Captricity,项目名称:redash,代码行数:7,代码来源:api.py
示例14: run_query
def run_query(self, query, user):
connection = self._get_connection()
_wait(connection, timeout=10)
cursor = connection.cursor()
try:
cursor.execute(query)
_wait(connection)
if cursor.description is not None:
columns = self.fetch_columns([(i[0], types_map.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data, ignore_nan=True)
else:
error = 'Query completed but it returned no data.'
json_data = None
except (select.error, OSError) as e:
error = "Query interrupted. Please retry."
json_data = None
except psycopg2.DatabaseError as e:
error = e.message
json_data = None
except (KeyboardInterrupt, InterruptException):
connection.cancel()
error = "Query cancelled by user."
json_data = None
finally:
connection.close()
return json_data, error
开发者ID:rohithreddy,项目名称:redash,代码行数:34,代码来源:pg.py
示例15: run_query
def run_query(self, query, user):
connection = None
try:
if self.configuration.get('username', '') and self.configuration.get('password', ''):
auth_provider = PlainTextAuthProvider(username='{}'.format(self.configuration.get('username', '')),
password='{}'.format(self.configuration.get('password', '')))
connection = Cluster([self.configuration.get('host', '')],
auth_provider=auth_provider,
port=self.configuration.get('port', ''),
protocol_version=self.configuration.get('protocol', 3))
else:
connection = Cluster([self.configuration.get('host', '')],
port=self.configuration.get('port', ''),
protocol_version=self.configuration.get('protocol', 3))
session = connection.connect()
session.set_keyspace(self.configuration['keyspace'])
session.default_timeout = self.configuration.get('timeout', 10)
logger.debug("Cassandra running query: %s", query)
result = session.execute(query)
column_names = result.column_names
columns = self.fetch_columns(map(lambda c: (c, 'string'), column_names))
rows = [dict(zip(column_names, row)) for row in result]
data = {'columns': columns, 'rows': rows}
json_data = json_dumps(data, cls=CassandraJSONEncoder)
error = None
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
return json_data, error
开发者ID:ariarijp,项目名称:redash,代码行数:35,代码来源:cass.py
示例16: run_query
def run_query(self, query, user):
logger.debug("BigQuery got query: %s", query)
bigquery_service = self._get_bigquery_service()
jobs = bigquery_service.jobs()
try:
if "totalMBytesProcessedLimit" in self.configuration:
limitMB = self.configuration["totalMBytesProcessedLimit"]
processedMB = self._get_total_bytes_processed(jobs, query) / 1000.0 / 1000.0
if limitMB < processedMB:
return None, "Larger than %d MBytes will be processed (%f MBytes)" % (limitMB, processedMB)
data = self._get_query_result(jobs, query)
error = None
json_data = json_dumps(data, ignore_nan=True)
except apiclient.errors.HttpError as e:
json_data = None
if e.resp.status == 400:
error = json_loads(e.content)['error']['message']
else:
error = e.content
except KeyboardInterrupt:
error = "Query cancelled by user."
json_data = None
return json_data, error
开发者ID:getredash,项目名称:redash,代码行数:28,代码来源:big_query.py
示例17: public_dashboard
def public_dashboard(token, org_slug=None):
# TODO: verify object is a dashboard?
if not isinstance(current_user, models.ApiUser):
api_key = models.ApiKey.get_by_api_key(token)
dashboard = api_key.object
else:
dashboard = current_user.object
user = {
'permissions': [],
'apiKey': current_user.id
}
headers = {
'Cache-Control': 'no-cache, no-store, max-age=0, must-revalidate'
}
response = render_template("public.html",
headless='embed' in request.args,
user=json.dumps(user),
seed_data=json_dumps({
'dashboard': serializers.public_dashboard(dashboard)
}),
base_href=base_href(),
name=settings.NAME,
client_config=json.dumps(settings.COMMON_CLIENT_CONFIG),
analytics=settings.ANALYTICS)
return response, 200, headers
开发者ID:snirad,项目名称:redash,代码行数:29,代码来源:embed.py
示例18: run_query
def run_query(self, query, user):
logger.debug("Metrica is about to execute query: %s", query)
data = None
query = query.strip()
if query == "":
error = "Query is empty"
return data, error
try:
params = yaml.safe_load(query)
except ValueError as e:
logging.exception(e)
error = unicode(e)
return data, error
if isinstance(params, dict):
if 'url' in params:
params = parse_qs(urlparse(params['url']).query, keep_blank_values=True)
else:
error = 'The query format must be JSON or YAML'
return data, error
try:
data = json_dumps(parse_ym_response(self._send_query(**params)))
error = None
except Exception as e:
logging.exception(e)
error = unicode(e)
return data, error
开发者ID:jonyboy2000,项目名称:redash,代码行数:28,代码来源:yandex_metrica.py
示例19: run_query
def run_query(self, query, user):
url = self.configuration['url']
kylinuser = self.configuration['user']
kylinpass = self.configuration['password']
kylinproject = self.configuration['project']
resp = requests.post(
os.path.join(url, "api/query"),
auth=HTTPBasicAuth(kylinuser, kylinpass),
json={
"sql": query,
"offset": settings.KYLIN_OFFSET,
"limit": settings.KYLIN_LIMIT,
"acceptPartial": settings.KYLIN_ACCEPT_PARTIAL,
"project": kylinproject
}
)
if not resp.ok:
return {}, resp.text or str(resp.reason)
data = resp.json()
columns = self.get_columns(data['columnMetas'])
rows = self.get_rows(columns, data['results'])
return json_dumps({'columns': columns, 'rows': rows}), None
开发者ID:ariarijp,项目名称:redash,代码行数:26,代码来源:kylin.py
示例20: test_supports_relative_timestamps
def test_supports_relative_timestamps(self):
query = {
'ts': {'$humanTime': '1 hour ago'}
}
one_hour_ago = parse_human_time("1 hour ago")
query_data = parse_query_json(json_dumps(query))
self.assertEqual(query_data['ts'], one_hour_ago)
开发者ID:ariarijp,项目名称:redash,代码行数:8,代码来源:test_mongodb.py
注:本文中的redash.utils.json_dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论