本文整理汇总了Python中sqlparse.lexer.tokenize函数的典型用法代码示例。如果您正苦于以下问题:Python tokenize函数的具体用法?Python tokenize怎么用?Python tokenize使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了tokenize函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_stream_simple
def test_stream_simple():
stream = StringIO("SELECT 1; SELECT 2;")
tokens = lexer.tokenize(stream)
assert len(list(tokens)) == 9
stream.seek(0)
tokens = list(lexer.tokenize(stream))
assert len(tokens) == 9
stream.seek(0)
tokens = list(lexer.tokenize(stream))
assert len(tokens) == 9
开发者ID:andialbrecht,项目名称:sqlparse,代码行数:13,代码来源:test_tokenize.py
示例2: test_linebreaks
def test_linebreaks(self): # issue1
sql = 'foo\nbar\n'
tokens = lexer.tokenize(sql)
self.assertEqual(''.join(str(x[1]) for x in tokens), sql)
sql = 'foo\rbar\r'
tokens = lexer.tokenize(sql)
self.assertEqual(''.join(str(x[1]) for x in tokens), sql)
sql = 'foo\r\nbar\r\n'
tokens = lexer.tokenize(sql)
self.assertEqual(''.join(str(x[1]) for x in tokens), sql)
sql = 'foo\r\nbar\n'
tokens = lexer.tokenize(sql)
self.assertEqual(''.join(str(x[1]) for x in tokens), sql)
开发者ID:SaiyanRiku,项目名称:opendbviewer,代码行数:13,代码来源:test_tokenize.py
示例3: test_simple
def test_simple(self):
stream = StringIO("SELECT 1; SELECT 2;")
tokens = lexer.tokenize(stream)
self.assertEqual(len(list(tokens)), 9)
stream.seek(0)
tokens = list(lexer.tokenize(stream))
self.assertEqual(len(tokens), 9)
stream.seek(0)
tokens = list(lexer.tokenize(stream))
self.assertEqual(len(tokens), 9)
开发者ID:JasonMWhite,项目名称:sqlparse,代码行数:13,代码来源:test_tokenize.py
示例4: count
def count(self):
# Reset .order_by() which is not required for .count() and may cause
# 'column "FOO" must appear in the GROUP BY clause or be used in an aggregate function'
# error when particular column is in the list of currently applied order_by().
# .filter() seems not to be affected.
c = self.order_by()
# Rewrite query arguments to 'count(*)' function.
stmts = tokenize(c.query.sql)
rewrite_query = []
is_rewritten = False
copying = True
for token_type, token_value in stmts:
if copying:
rewrite_query.append(token_value)
if token_type == Token.Keyword.DML and token_value.upper() == 'SELECT':
copying = False
is_rewritten = True
rewrite_query.append(' count(*) ')
elif token_type == Token.Keyword and token_value.upper() == 'FROM':
copying = True
rewrite_query.append(token_value)
if is_rewritten:
c.query.sql = ''.join(rewrite_query)
query = iter(c.query)
for values in query:
count = values[0]
return count
# Fallback to approximate QuerySet.count() when SQL query rewrite failed.
return c.filtered_qs.count()
开发者ID:Dmitri-Sintsov,项目名称:django-jinja-knockout,代码行数:32,代码来源:query.py
示例5: parse
def parse(self, sql, encoding):
stream = lexer.tokenize(sql, encoding)
statements = _split_statements(stream)
stack = engine.FilterStack()
stack.enable_grouping()
for statement in statements:
yield stack.run(statement)
开发者ID:yodebu,项目名称:sqlparse,代码行数:7,代码来源:parsers.py
示例6: test_compact1
def test_compact1(self):
stream = compact(tokenize(self.sql))
result = Tokens2Unicode(stream)
self.assertEqual(result,
'INSERT INTO directories(inode)VALUES(:inode)LIMIT 1')
开发者ID:adamchainz,项目名称:sqlparse,代码行数:7,代码来源:test_functions.py
示例7: test_compact2
def test_compact2(self):
stream = tokenize(self.sql2)
result = compact(stream)
self.assertEqual(Tokens2Unicode(result),
'SELECT child_entry,asdf AS inode,creation FROM links WHERE '
'parent_dir==:parent_dir AND name==:name LIMIT 1')
开发者ID:XiaonuoGantan,项目名称:sqlparse,代码行数:8,代码来源:test_functions.py
示例8: test_tokenize_simple
def test_tokenize_simple():
s = 'select * from foo;'
stream = lexer.tokenize(s)
assert isinstance(stream, types.GeneratorType)
tokens = list(stream)
assert len(tokens) == 8
assert len(tokens[0]) == 2
assert tokens[0] == (T.Keyword.DML, 'select')
assert tokens[-1] == (T.Punctuation, ';')
开发者ID:andialbrecht,项目名称:sqlparse,代码行数:9,代码来源:test_tokenize.py
示例9: test_simple
def test_simple(self):
sql = 'select * from foo;'
stream = lexer.tokenize(sql)
self.assert_(type(stream) is types.GeneratorType)
tokens = list(stream)
self.assertEqual(len(tokens), 8)
self.assertEqual(len(tokens[0]), 2)
self.assertEqual(tokens[0], (Keyword.DML, u'select'))
self.assertEqual(tokens[-1], (Punctuation, u';'))
开发者ID:SaiyanRiku,项目名称:opendbviewer,代码行数:9,代码来源:test_tokenize.py
示例10: test_StripWhitespace3
def test_StripWhitespace3(self):
self.assertEqual(Tokens2Unicode(StripWhitespace(tokenize(self.sql3))),
'SELECT 0 AS st_dev,0 AS st_uid,0 AS st_gid,dir_entries.type AS '
'st_mode,dir_entries.inode AS st_ino,COUNT(links.child_entry)AS '
'st_nlink,:creation AS st_ctime,dir_entries.access AS st_atime,'
'dir_entries.modification AS st_mtime,COALESCE(files.size,0)AS '
'st_size,COALESCE(files.size,0)AS size FROM dir_entries LEFT JOIN'
' files ON dir_entries.inode==files.inode LEFT JOIN links ON '
'dir_entries.inode==links.child_entry WHERE dir_entries.inode=='
':inode GROUP BY dir_entries.inode LIMIT 1')
开发者ID:XiaonuoGantan,项目名称:sqlparse,代码行数:10,代码来源:test_filters.py
示例11: split
def split(sql, encoding=None):
"""Split *sql* into single statements.
:param sql: A string containting one or more SQL statements.
:param encoding: The encoding of the statement (optional).
:returns: A list of strings.
"""
stream = lexer.tokenize(sql, encoding)
splitter = StatementFilter()
stream = splitter.process(None, stream)
return [unicode(stmt).strip() for stmt in stream]
开发者ID:abrarsheikh,项目名称:sqlparse,代码行数:11,代码来源:__init__.py
示例12: test_inline_keywords
def test_inline_keywords(self): # issue 7
sql = "create created_foo"
tokens = list(lexer.tokenize(sql))
self.assertEqual(len(tokens), 3)
self.assertEqual(tokens[0][0], Keyword.DDL)
self.assertEqual(tokens[2][0], Name)
self.assertEqual(tokens[2][1], u'created_foo')
sql = "enddate"
tokens = list(lexer.tokenize(sql))
self.assertEqual(len(tokens), 1)
self.assertEqual(tokens[0][0], Name)
sql = "join_col"
tokens = list(lexer.tokenize(sql))
self.assertEqual(len(tokens), 1)
self.assertEqual(tokens[0][0], Name)
sql = "left join_col"
tokens = list(lexer.tokenize(sql))
self.assertEqual(len(tokens), 3)
self.assertEqual(tokens[2][0], Name)
self.assertEqual(tokens[2][1], 'join_col')
开发者ID:SaiyanRiku,项目名称:opendbviewer,代码行数:20,代码来源:test_tokenize.py
示例13: test_includeStatement
def test_includeStatement(self):
stream = tokenize(self.sql)
includeStatement = IncludeStatement('tests/files', raiseexceptions=True)
stream = includeStatement.process(None, stream)
stream = compact(stream)
result = Tokens2Unicode(stream)
self.assertEqual(result,
'INSERT INTO dir_entries(type)VALUES(:type);INSERT INTO '
'directories(inode)VALUES(:inode)LIMIT 1')
开发者ID:XiaonuoGantan,项目名称:sqlparse,代码行数:11,代码来源:test_functions.py
示例14: test_tokenize_inline_keywords
def test_tokenize_inline_keywords():
# issue 7
s = "create created_foo"
tokens = list(lexer.tokenize(s))
assert len(tokens) == 3
assert tokens[0][0] == T.Keyword.DDL
assert tokens[2][0] == T.Name
assert tokens[2][1] == 'created_foo'
s = "enddate"
tokens = list(lexer.tokenize(s))
assert len(tokens) == 1
assert tokens[0][0] == T.Name
s = "join_col"
tokens = list(lexer.tokenize(s))
assert len(tokens) == 1
assert tokens[0][0] == T.Name
s = "left join_col"
tokens = list(lexer.tokenize(s))
assert len(tokens) == 3
assert tokens[2][0] == T.Name
assert tokens[2][1] == 'join_col'
开发者ID:andialbrecht,项目名称:sqlparse,代码行数:21,代码来源:test_tokenize.py
示例15: run
def run(self, sql, encoding=None):
stream = lexer.tokenize(sql, encoding)
# Process token stream
if self.preprocess:
for filter_ in self.preprocess:
stream = filter_.process(self, stream)
if (self.stmtprocess or self.postprocess or self.split_statements
or self._grouping):
splitter = StatementFilter()
stream = splitter.process(self, stream)
# import StripCommentsFilter in the run() method to avoid a circular dependency.
# For stripping comments, the only grouping method we want to invoke is
# grouping.group(), this considerably improves performance.
strip_comments_only = False
if self.stmtprocess and len(self.stmtprocess) == 1:
from sqlparse.filters import StripCommentsFilter
strip_comments_only = isinstance(self.stmtprocess[0], StripCommentsFilter)
if self._grouping:
def _group(stream):
for stmt in stream:
if strip_comments_only:
grouping.group_comments(stmt)
else:
grouping.group(stmt)
yield stmt
stream = _group(stream)
if self.stmtprocess:
def _run1(stream):
ret = []
for stmt in stream:
for filter_ in self.stmtprocess:
filter_.process(self, stmt)
ret.append(stmt)
return ret
stream = _run1(stream)
if self.postprocess:
def _run2(stream):
for stmt in stream:
stmt.tokens = list(self._flatten(stmt.tokens))
for filter_ in self.postprocess:
stmt = filter_.process(self, stmt)
yield stmt
stream = _run2(stream)
return stream
开发者ID:1ack,项目名称:Impala,代码行数:51,代码来源:__init__.py
示例16: run
def run(self, sql):
stream = lexer.tokenize(sql)
# Process token stream
if self.preprocess:
for filter_ in self.preprocess:
stream = filter_.process(self, stream)
if (self.stmtprocess or self.postprocess or self.split_statements
or self._grouping):
splitter = StatementFilter()
stream = splitter.process(self, stream)
if self._grouping:
def _group(stream):
# modified by rrana
pass
for stmt in stream:
grouping.group(stmt)
yield stmt
stream = _group(stream)
if self.stmtprocess:
def _run1(stream):
ret = []
for stmt in stream:
for filter_ in self.stmtprocess:
filter_.process(self, stmt)
ret.append(stmt)
return ret
stream = _run1(stream)
if self.postprocess:
def _run2(stream):
for stmt in stream:
stmt.tokens = list(self._flatten(stmt.tokens))
for filter_ in self.postprocess:
stmt = filter_.process(self, stmt)
yield stmt
stream = _run2(stream)
return stream
开发者ID:askrht,项目名称:listobistats,代码行数:44,代码来源:__init__.py
示例17: format
def format(sql, **options):
"""Format *sql* according to *options*.
Available options are documented in :ref:`formatting`.
In addition to the formatting options this function accepts the
keyword "encoding" which determines the encoding of the statement.
:returns: The formatted SQL statement as string.
"""
options = formatter.validate_options(options)
encoding = options.pop('encoding', None)
stream = lexer.tokenize(sql, encoding)
stream = _format_pre_process(stream, options)
stack = engine.FilterStack()
stack = formatter.build_filter_stack(stack, options)
stack.postprocess.append(filters.SerializerUnicode())
statements = split2(stream)
return ''.join(stack.run(statement) for statement in statements)
开发者ID:abrarsheikh,项目名称:sqlparse,代码行数:19,代码来源:__init__.py
示例18: run
def run(self, sql, encoding=None):
stream = lexer.tokenize(sql, encoding)
# Process token stream
for filter_ in self.preprocess:
stream = filter_.process(stream)
stream = StatementSplitter().process(stream)
# Output: Stream processed Statements
for stmt in stream:
if self._grouping:
stmt = grouping.group(stmt)
for filter_ in self.stmtprocess:
filter_.process(stmt)
for filter_ in self.postprocess:
stmt = filter_.process(stmt)
yield stmt
开发者ID:AdamG,项目名称:sqlparse,代码行数:20,代码来源:filter_stack.py
示例19: test_istype2
def test_istype2(self):
stream = tokenize(self.sql2)
self.assertTrue(IsType('SELECT')(stream))
stream = tokenize(self.sql2)
self.assertFalse(IsType('INSERT')(stream))
开发者ID:adamchainz,项目名称:sqlparse,代码行数:6,代码来源:test_functions.py
示例20: test_backticks
def test_backticks(self):
sql = '`foo`.`bar`'
tokens = list(lexer.tokenize(sql))
self.assertEqual(len(tokens), 3)
self.assertEqual(tokens[0], (Name, u'`foo`'))
开发者ID:SaiyanRiku,项目名称:opendbviewer,代码行数:5,代码来源:test_tokenize.py
注:本文中的sqlparse.lexer.tokenize函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论