本文整理汇总了Python中sqlparse.split函数的典型用法代码示例。如果您正苦于以下问题:Python split函数的具体用法?Python split怎么用?Python split使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了split函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_psql_quotation_marks
def test_psql_quotation_marks():
# issue83
# regression: make sure plain $$ work
t = sqlparse.split(
"""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;"""
)
assert len(t) == 2
# make sure $SOMETHING$ works too
t = sqlparse.split(
"""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $PROC_1$
....
$PROC_1$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $PROC_2$
....
$PROC_2$ LANGUAGE plpgsql;"""
)
assert len(t) == 2
开发者ID:cloudera,项目名称:hue,代码行数:26,代码来源:test_parse.py
示例2: test_split_quotes_with_new_line
def test_split_quotes_with_new_line():
stmts = sqlparse.split('select "foo\nbar"')
assert len(stmts) == 1
assert stmts[0] == 'select "foo\nbar"'
stmts = sqlparse.split("select 'foo\n\bar'")
assert len(stmts) == 1
assert stmts[0] == "select 'foo\n\bar'"
开发者ID:tonyzhimin,项目名称:sqlparse,代码行数:8,代码来源:test_split.py
示例3: sqlRstToBlockSequence
def sqlRstToBlockSequence(sqlText):
def __tryMatchCommentBlock(text):
"""
:param text: the text to parse
:return: (CommentBlock|None,str). The block matched and the rest
of the string or None and the text.
"""
m = re.match(BLOCK_COMMENT_BLOCK, text, re.MULTILINE | re.DOTALL)
if m:
return (
structure.BlockCommentBlock(m.group("comment"), m.group("before"), m.group("after")),
m.group("rest"),
)
m = re.match(LINES_COMMENT_BLOCK, text, re.MULTILINE | re.DOTALL)
if m:
return (
structure.LinesCommentBlock(m.group("comment"), m.group("before"), m.group("after")),
m.group("rest"),
)
return (None, text)
def __doMatchSQLStatement(text):
m = re.match(CREATE_TABLE_RE + ".*", text, re.IGNORECASE)
if m:
return structure.CreateTableStatementBlock(text, name=m.group("name"))
m = re.match(CREATE_VIEW_RE + ".*", text, re.IGNORECASE)
if m:
return structure.CreateViewStatementBlock(text, name=m.group("name"))
m = re.match(SELECT_RE + ".*", text, re.IGNORECASE)
if m:
return structure.SelectStatementBlock(text)
return structure.UnknownStatementBlock(text)
blocks = []
# Split the text in (comments* +sqlstatement restofline) segments.
# Spaces+lines before comments are removed, spaces after blocks
# are removed. some blank lines may disappear in the process.
segments = sqlparse.split(sqlText)
# Process each segment as they still may contains some comments
# additionaly to one (or more?) sql statements
for segment in segments:
rest = segment
# try to get all comments before the sql statement(s?)
(comment_block, rest) = __tryMatchCommentBlock(rest)
while comment_block is not None: # and not re.match('\s*',rest, re.MULTILINE):
blocks.append(comment_block)
(comment_block, rest) = __tryMatchCommentBlock(rest)
# a priori there is just one remaining sql block,
# but just to be sure...
stmt_texts = sqlparse.split(rest)
for stmt_text in stmt_texts:
blocks.append(__doMatchSQLStatement(stmt_text))
return blocks
开发者ID:ScribesZone,项目名称:ScribesInfra,代码行数:58,代码来源:parser.py
示例4: exec_impala_query_from_file
def exec_impala_query_from_file(file_name):
"""Execute each query in an Impala query file individually"""
if not os.path.exists(file_name):
LOG.info("Error: File {0} not found".format(file_name))
return False
LOG.info("Beginning execution of impala SQL: {0}".format(file_name))
is_success = True
impala_client = ImpalaBeeswaxClient(options.impalad, use_kerberos=options.use_kerberos)
output_file = file_name + ".log"
with open(output_file, 'w') as out_file:
try:
impala_client.connect()
with open(file_name, 'r+') as query_file:
queries = sqlparse.split(query_file.read())
for query in queries:
query = sqlparse.format(query.rstrip(';'), strip_comments=True)
if query.strip() != "":
result = impala_client.execute(query)
out_file.write("{0}\n{1}\n".format(query, result))
except Exception as e:
out_file.write("ERROR: {0}\n".format(query))
traceback.print_exc(file=out_file)
is_success = False
if is_success:
LOG.info("Finished execution of impala SQL: {0}".format(file_name))
else:
LOG.info("Error executing impala SQL: {0} See: {1}".format(file_name, \
output_file))
return is_success
开发者ID:jbapple-cloudera,项目名称:incubator-impala,代码行数:32,代码来源:load-data.py
示例5: executeSQL
def executeSQL(connection,command,parent_conn = None):
''' command is a sequence of SQL commands
separated by ";" and possibly "\n"
connection is a MySQLdb connection
returns the output from the last command
in the sequence
'''
#split commands by \n
commands = command.split("\n")
#remove comments and whitespace"
commands = [x for x in commands if x.lstrip()[0:2] != '--']
commands = [re.sub('\r','',x) for x in commands if x.lstrip() != '\r']
command = ' '.join(commands)
statements = sqlparse.split(command)
count = 0
for statement in statements:
cur = connection.cursor()
#make sure actually does something
if sqlparse.parse(statement):
cur.execute(statement)
cur.close()
connection.commit()
if parent_conn:
parent_conn.send(True)
return True
开发者ID:sebboyer,项目名称:DigitalLearnerQuantified,代码行数:26,代码来源:sql_functions.py
示例6: explain
def explain(self):
self.results.assure_visible()
buf = self.textview.get_buffer()
bounds = buf.get_selection_bounds()
if not bounds:
bounds = buf.get_bounds()
statement = buf.get_text(*bounds)
if len(sqlparse.split(statement)) > 1:
dialogs.error(_(u"Select a single statement to explain."))
return
if not self.connection:
return
queries = [Query(stmt, self.connection)
for stmt in self.connection.explain_statements(statement)]
def _execute_next(last, queries):
if last is not None and last.failed:
self.results.set_explain_results(last)
return
q = queries.pop(0)
if len(queries) == 0:
q.connect('finished',
lambda x: self.results.set_explain_results(x))
else:
q.connect('finished',
lambda x: _execute_next(x, queries))
q.execute()
_execute_next(None, queries)
开发者ID:kleopatra999,项目名称:crunchyfrog,代码行数:27,代码来源:editor.py
示例7: get_schema
def get_schema(self, name):
fp = self.load_file(name, 'schemas')
query = fp.read()
# queries = query.split(';')
queries = sqlparse.split(query)
ret_queries = [q.strip() for q in queries if q.strip()]
return ret_queries
开发者ID:samupl,项目名称:sql-benchmark,代码行数:7,代码来源:query_loader.py
示例8: loadDatabaseSchema
def loadDatabaseSchema(dumpFileName, ignoreSlonyTriggers = False):
database = PgDatabase()
logging.debug('Loading %s file' % dumpFileName)
statements = sqlparse.split(open(dumpFileName,'r'))
logging.debug('Parsed %d statements' % len(statements))
for statement in statements:
statement = PgDumpLoader.strip_comment(statement).strip()
if PgDumpLoader.PATTERN_CREATE_SCHEMA.match(statement):
CreateSchemaParser.parse(database, statement)
continue
match = PgDumpLoader.PATTERN_DEFAULT_SCHEMA.match(statement)
if match:
database.setDefaultSchema(match.group(1))
continue
if PgDumpLoader.PATTERN_CREATE_TABLE.match(statement):
CreateTableParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_ALTER_TABLE.match(statement):
AlterTableParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_CREATE_SEQUENCE.match(statement):
CreateSequenceParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_ALTER_SEQUENCE.match(statement):
AlterSequenceParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_CREATE_INDEX.match(statement):
CreateIndexParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_CREATE_VIEW.match(statement):
CreateViewParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_ALTER_VIEW.match(statement):
AlterViewParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_CREATE_TRIGGER.match(statement):
CreateTriggerParser.parse(database, statement, ignoreSlonyTriggers)
continue
if PgDumpLoader.PATTERN_CREATE_FUNCTION.match(statement):
CreateFunctionParser.parse(database, statement)
continue
if PgDumpLoader.PATTERN_COMMENT.match(statement):
CommentParser.parse(database, statement)
continue
logging.info('Ignored statement: %s' % statement)
return database
开发者ID:Dancer3809,项目名称:PgDiffPy,代码行数:60,代码来源:PgDumpLoader.py
示例9: test_if_function
def test_if_function(self): # see issue 33
# don't let IF as a function confuse the splitter
sql = ('CREATE TEMPORARY TABLE tmp '
'SELECT IF(a=1, a, b) AS o FROM one; '
'SELECT t FROM two')
stmts = sqlparse.split(sql)
self.assertEqual(len(stmts), 2)
开发者ID:AndiDog,项目名称:sqlparse,代码行数:7,代码来源:test_split.py
示例10: run
def run(self, statement):
"""Execute the sql in the database and return the results. The results
are a list of tuples. Each tuple has 4 values
(title, rows, headers, status).
"""
# Remove spaces and EOL
statement = statement.strip()
if not statement: # Empty string
yield (None, None, None, None)
# Split the sql into separate queries and run each one.
for sql in sqlparse.split(statement):
# Remove spaces, eol and semi-colons.
sql = sql.rstrip(';')
# \G is treated specially since we have to set the expanded output
# and then proceed to execute the sql as normal.
if sql.endswith('\\G'):
special.set_expanded_output(True)
yield self.execute_normal_sql(sql.rsplit('\\G', 1)[0])
else:
try: # Special command
_logger.debug('Trying a dbspecial command. sql: %r', sql)
cur = self.conn.cursor()
for result in special.execute(cur, sql):
yield result
except special.CommandNotFound: # Regular SQL
yield self.execute_normal_sql(sql)
开发者ID:steverobbins,项目名称:mycli,代码行数:29,代码来源:sqlexecute.py
示例11: execute_statements
def execute_statements(cursor, statements):
"""Executes statements."""
statements = statements.strip(u'%s%s' % (string.whitespace, ';'))
statement_list = None
if statements:
statement_list = sqlparse.split(statements)
if not statements:
return
try:
for statement in statement_list:
statement = statement.rstrip(u'%s%s' % (string.whitespace, ';'))
if not statement:
continue
cursor.execute(statement)
while cursor.nextset() is not None:
pass
finally:
while cursor.nextset() is not None:
pass
开发者ID:emedez,项目名称:schemanizer,代码行数:26,代码来源:mysql_functions.py
示例12: execute_favorite_query
def execute_favorite_query(cur, arg, **_):
"""Returns (title, rows, headers, status)"""
if arg == '':
for result in list_favorite_queries():
yield result
"""Parse out favorite name and optional substitution parameters"""
name, _, arg_str = arg.partition(' ')
args = shlex.split(arg_str)
query = favoritequeries.get(name)
if query is None:
message = "No favorite query: %s" % (name)
yield (None, None, None, message)
else:
query, arg_error = subst_favorite_query_args(query, args)
if arg_error:
yield (None, None, None, arg_error)
else:
for sql in sqlparse.split(query):
sql = sql.rstrip(';')
title = '> %s' % (sql)
cur.execute(sql)
if cur.description:
headers = [x[0] for x in cur.description]
yield (title, cur, headers, None)
else:
yield (title, None, None, None)
开发者ID:dbcli,项目名称:mycli,代码行数:28,代码来源:iocommands.py
示例13: find_statements
def find_statements(self):
"""Finds statements in current buffer.
Returns:
List of 2-tuples (start iter, end iter).
"""
buffer_ = self.get_buffer()
buffer_start, buffer_end = buffer_.get_bounds()
content = buffer_.get_text(buffer_start, buffer_end)
iter_ = buffer_.get_start_iter()
for stmt in sqlparse.split(content):
if not stmt.strip():
continue
# FIXME: Does not work if linebreaks in buffers are not '\n'.
# sqlparse unifies them!
bounds = iter_.forward_search(stmt.strip(),
gtk.TEXT_SEARCH_TEXT_ONLY)
if bounds is None:
continue
start, end = bounds
yield start, end
iter_ = end
if not iter_:
raise StopIteration
raise StopIteration
开发者ID:kleopatra999,项目名称:crunchyfrog,代码行数:25,代码来源:__init__.py
示例14: run
def run(conn, sql, config, user_namespace):
if sql.strip():
for statement in sqlparse.split(sql):
first_word = sql.strip().split()[0].lower()
if first_word == 'begin':
raise Exception("ipython_sql does not support transactions")
if first_word.startswith('\\') and 'postgres' in str(conn.dialect):
pgspecial = PGSpecial()
_, cur, headers, _ = pgspecial.execute(
conn.session.connection.cursor(),
statement)[0]
result = FakeResultProxy(cur, headers)
else:
txt = sqlalchemy.sql.text(statement)
result = conn.session.execute(txt, user_namespace)
try:
# mssql has autocommit
if config.autocommit and ('mssql' not in str(conn.dialect)):
conn.session.execute('commit')
except sqlalchemy.exc.OperationalError:
pass # not all engines can commit
if result and config.feedback:
print(interpret_rowcount(result.rowcount))
resultset = ResultSet(result, statement, config)
if config.autopandas:
return resultset.DataFrame()
else:
return resultset
#returning only last result, intentionally
else:
return 'Connected: %s' % conn.name
开发者ID:gett-labs,项目名称:ipython-sql,代码行数:31,代码来源:run.py
示例15: run
def run(self, statement):
"""Execute the sql in the database and return the results. The results
are a list of tuples. Each tuple has 4 values
(title, rows, headers, status).
"""
# Remove spaces and EOL
statement = statement.strip()
if not statement: # Empty string
yield (None, None, None, None)
# Split the sql into separate queries and run each one.
# Unless it's saving a favorite query, in which case we
# want to save them all together.
if statement.startswith('\\fs'):
components = [statement]
else:
components = sqlparse.split(statement)
for sql in components:
# Remove spaces, eol and semi-colons.
sql = sql.rstrip(';')
# \G is treated specially since we have to set the expanded output.
if sql.endswith('\\G'):
special.set_expanded_output(True)
sql = sql[:-2].strip()
try: # Special command
_logger.debug('Trying a dbspecial command. sql: %r', sql)
cur = self.conn.cursor()
for result in special.execute(cur, sql):
yield result
except special.CommandNotFound: # Regular SQL
yield self.execute_normal_sql(sql)
开发者ID:spencervillars,项目名称:AutoSchema,代码行数:35,代码来源:sqlexecute.py
示例16: clean_reply
def clean_reply(self, reply, dataset):
solve_sql = reply.solve_sql.strip()
if not solve_sql:
raise FormatError("Empty query")
if len(sqlparse.split(solve_sql)) > 1:
raise FormatError("Only one query is allowed")
return solve_sql
开发者ID:StepicOrg,项目名称:stepic-plugins,代码行数:7,代码来源:__init__.py
示例17: run
def run(self, statement, pgspecial=None):
"""Execute the sql in the database and return the results.
:param statement: A string containing one or more sql statements
:param pgspecial: PGSpecial object
:return: List of tuples containing (title, rows, headers, status)
"""
# Remove spaces and EOL
statement = statement.strip()
if not statement: # Empty string
yield (None, None, None, None)
# Split the sql into separate queries and run each one.
for sql in sqlparse.split(statement):
# Remove spaces, eol and semi-colons.
sql = sql.rstrip(";")
if pgspecial:
# First try to run each query as special
try:
_logger.debug("Trying a pgspecial command. sql: %r", sql)
cur = self.conn.cursor()
for result in pgspecial.execute(cur, sql):
yield result
return
except special.CommandNotFound:
pass
yield self.execute_normal_sql(sql)
开发者ID:nosun,项目名称:pgcli,代码行数:30,代码来源:pgexecute.py
示例18: prepare_sql_script
def prepare_sql_script(self, sql, _allow_fallback=False):
"""
Takes a SQL script that may contain multiple lines and returns a list
of statements to feed to successive cursor.execute() calls.
Since few databases are able to process raw SQL scripts in a single
cursor.execute() call and PEP 249 doesn't talk about this use case,
the default implementation is conservative.
"""
# Remove _allow_fallback and keep only 'return ...' in Django 1.9.
try:
# This import must stay inside the method because it's optional.
import sqlparse
except ImportError:
if _allow_fallback:
# Without sqlparse, fall back to the legacy (and buggy) logic.
warnings.warn(
"Providing initial SQL data on a %s database will require "
"sqlparse in Django 1.9." % self.connection.vendor,
RemovedInDjango19Warning)
from django.core.management.sql import _split_statements
return _split_statements(sql)
else:
raise
else:
return [sqlparse.format(statement, strip_comments=True)
for statement in sqlparse.split(sql) if statement]
开发者ID:zulip,项目名称:truncated-django,代码行数:27,代码来源:operations.py
示例19: run
def run(self, statement, pgspecial=None, exception_formatter=None,
on_error_resume=False):
"""Execute the sql in the database and return the results.
:param statement: A string containing one or more sql statements
:param pgspecial: PGSpecial object
:param exception_formatter: A callable that accepts an Exception and
returns a formatted (title, rows, headers, status) tuple that can
act as a query result. If an exception_formatter is not supplied,
psycopg2 exceptions are always raised.
:param on_error_resume: Bool. If true, queries following an exception
(assuming exception_formatter has been supplied) continue to
execute.
:return: Generator yielding tuples containing
(title, rows, headers, status, query, success)
"""
# Remove spaces and EOL
statement = statement.strip()
if not statement: # Empty string
yield (None, None, None, None, statement, False)
# Split the sql into separate queries and run each one.
for sql in sqlparse.split(statement):
# Remove spaces, eol and semi-colons.
sql = sql.rstrip(';')
try:
if pgspecial:
# First try to run each query as special
_logger.debug('Trying a pgspecial command. sql: %r', sql)
cur = self.conn.cursor()
try:
for result in pgspecial.execute(cur, sql):
# e.g. execute_from_file already appends these
if len(result) < 6:
yield result + (sql, True)
else:
yield result
continue
except special.CommandNotFound:
pass
# Not a special command, so execute as normal sql
yield self.execute_normal_sql(sql) + (sql, True)
except psycopg2.DatabaseError as e:
_logger.error("sql: %r, error: %r", sql, e)
_logger.error("traceback: %r", traceback.format_exc())
if (isinstance(e, psycopg2.OperationalError)
or not exception_formatter):
# Always raise operational errors, regardless of on_error
# specification
raise
yield None, None, None, exception_formatter(e), sql, False
if not on_error_resume:
break
开发者ID:alobaid,项目名称:pgcli,代码行数:60,代码来源:pgexecute.py
示例20: test_issue485_split_multi
def test_issue485_split_multi():
p_sql = '''CREATE OR REPLACE RULE ruled_tab_2rules AS ON INSERT
TO public.ruled_tab
DO instead (
select 1;
select 2;
);'''
assert len(sqlparse.split(p_sql)) == 1
开发者ID:andialbrecht,项目名称:sqlparse,代码行数:8,代码来源:test_regressions.py
注:本文中的sqlparse.split函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论