578 lines
21 KiB
Python
578 lines
21 KiB
Python
"""Utilities parsing and analyzing Python code."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import contextlib
|
|
import inspect
|
|
import itertools
|
|
import re
|
|
import tokenize
|
|
from inspect import Signature
|
|
from token import DEDENT, INDENT, NAME, NEWLINE, NUMBER, OP, STRING
|
|
from tokenize import COMMENT, NL
|
|
from typing import Any
|
|
|
|
from sphinx.pycode.ast import unparse as ast_unparse
|
|
|
|
comment_re = re.compile('^\\s*#: ?(.*)\r?\n?$')
|
|
indent_re = re.compile('^\\s*$')
|
|
emptyline_re = re.compile('^\\s*(#.*)?$')
|
|
|
|
|
|
def filter_whitespace(code: str) -> str:
|
|
return code.replace('\f', ' ') # replace FF (form feed) with whitespace
|
|
|
|
|
|
def get_assign_targets(node: ast.AST) -> list[ast.expr]:
|
|
"""Get list of targets from Assign and AnnAssign node."""
|
|
if isinstance(node, ast.Assign):
|
|
return node.targets
|
|
else:
|
|
return [node.target] # type: ignore[attr-defined]
|
|
|
|
|
|
def get_lvar_names(node: ast.AST, self: ast.arg | None = None) -> list[str]:
|
|
"""Convert assignment-AST to variable names.
|
|
|
|
This raises `TypeError` if the assignment does not create new variable::
|
|
|
|
ary[0] = 'foo'
|
|
dic["bar"] = 'baz'
|
|
# => TypeError
|
|
"""
|
|
if self:
|
|
self_id = self.arg
|
|
|
|
node_name = node.__class__.__name__
|
|
if node_name in ('Constant', 'Index', 'Slice', 'Subscript'):
|
|
raise TypeError('%r does not create new variable' % node)
|
|
if node_name == 'Name':
|
|
if self is None or node.id == self_id: # type: ignore[attr-defined]
|
|
return [node.id] # type: ignore[attr-defined]
|
|
else:
|
|
raise TypeError('The assignment %r is not instance variable' % node)
|
|
elif node_name in ('Tuple', 'List'):
|
|
members = []
|
|
for elt in node.elts: # type: ignore[attr-defined]
|
|
with contextlib.suppress(TypeError):
|
|
members.extend(get_lvar_names(elt, self))
|
|
|
|
return members
|
|
elif node_name == 'Attribute':
|
|
if (
|
|
node.value.__class__.__name__ == 'Name' and # type: ignore[attr-defined]
|
|
self and node.value.id == self_id # type: ignore[attr-defined]
|
|
):
|
|
# instance variable
|
|
return ["%s" % get_lvar_names(node.attr, self)[0]] # type: ignore[attr-defined]
|
|
else:
|
|
raise TypeError('The assignment %r is not instance variable' % node)
|
|
elif node_name == 'str':
|
|
return [node] # type: ignore[list-item]
|
|
elif node_name == 'Starred':
|
|
return get_lvar_names(node.value, self) # type: ignore[attr-defined]
|
|
else:
|
|
raise NotImplementedError('Unexpected node name %r' % node_name)
|
|
|
|
|
|
def dedent_docstring(s: str) -> str:
|
|
"""Remove common leading indentation from docstring."""
|
|
def dummy() -> None:
|
|
# dummy function to mock `inspect.getdoc`.
|
|
pass
|
|
|
|
dummy.__doc__ = s
|
|
docstring = inspect.getdoc(dummy)
|
|
if docstring:
|
|
return docstring.lstrip("\r\n").rstrip("\r\n")
|
|
else:
|
|
return ""
|
|
|
|
|
|
class Token:
|
|
"""Better token wrapper for tokenize module."""
|
|
|
|
def __init__(self, kind: int, value: Any, start: tuple[int, int], end: tuple[int, int],
|
|
source: str) -> None:
|
|
self.kind = kind
|
|
self.value = value
|
|
self.start = start
|
|
self.end = end
|
|
self.source = source
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
if isinstance(other, int):
|
|
return self.kind == other
|
|
elif isinstance(other, str):
|
|
return self.value == other
|
|
elif isinstance(other, (list, tuple)):
|
|
return [self.kind, self.value] == list(other)
|
|
elif other is None:
|
|
return False
|
|
else:
|
|
raise ValueError('Unknown value: %r' % other)
|
|
|
|
def match(self, *conditions: Any) -> bool:
|
|
return any(self == candidate for candidate in conditions)
|
|
|
|
def __repr__(self) -> str:
|
|
return f'<Token kind={tokenize.tok_name[self.kind]!r} value={self.value.strip()!r}>'
|
|
|
|
|
|
class TokenProcessor:
|
|
def __init__(self, buffers: list[str]) -> None:
|
|
lines = iter(buffers)
|
|
self.buffers = buffers
|
|
self.tokens = tokenize.generate_tokens(lambda: next(lines))
|
|
self.current: Token | None = None
|
|
self.previous: Token | None = None
|
|
|
|
def get_line(self, lineno: int) -> str:
|
|
"""Returns specified line."""
|
|
return self.buffers[lineno - 1]
|
|
|
|
def fetch_token(self) -> Token | None:
|
|
"""Fetch the next token from source code.
|
|
|
|
Returns ``None`` if sequence finished.
|
|
"""
|
|
try:
|
|
self.previous = self.current
|
|
self.current = Token(*next(self.tokens))
|
|
except StopIteration:
|
|
self.current = None
|
|
|
|
return self.current
|
|
|
|
def fetch_until(self, condition: Any) -> list[Token]:
|
|
"""Fetch tokens until specified token appeared.
|
|
|
|
.. note:: This also handles parenthesis well.
|
|
"""
|
|
tokens = []
|
|
while current := self.fetch_token():
|
|
tokens.append(current)
|
|
if current == condition:
|
|
break
|
|
if current == [OP, '(']:
|
|
tokens += self.fetch_until([OP, ')'])
|
|
elif current == [OP, '{']:
|
|
tokens += self.fetch_until([OP, '}'])
|
|
elif current == [OP, '[']:
|
|
tokens += self.fetch_until([OP, ']'])
|
|
|
|
return tokens
|
|
|
|
|
|
class AfterCommentParser(TokenProcessor):
|
|
"""Python source code parser to pick up comments after assignments.
|
|
|
|
This parser takes code which starts with an assignment statement,
|
|
and returns the comment for the variable if one exists.
|
|
"""
|
|
|
|
def __init__(self, lines: list[str]) -> None:
|
|
super().__init__(lines)
|
|
self.comment: str | None = None
|
|
|
|
def fetch_rvalue(self) -> list[Token]:
|
|
"""Fetch right-hand value of assignment."""
|
|
tokens = []
|
|
while current := self.fetch_token():
|
|
tokens.append(current)
|
|
if current == [OP, '(']:
|
|
tokens += self.fetch_until([OP, ')'])
|
|
elif current == [OP, '{']:
|
|
tokens += self.fetch_until([OP, '}'])
|
|
elif current == [OP, '[']:
|
|
tokens += self.fetch_until([OP, ']'])
|
|
elif current == INDENT:
|
|
tokens += self.fetch_until(DEDENT)
|
|
elif current == [OP, ';']: # NoQA: SIM114
|
|
break
|
|
elif current and current.kind not in {OP, NAME, NUMBER, STRING}:
|
|
break
|
|
|
|
return tokens
|
|
|
|
def parse(self) -> None:
|
|
"""Parse the code and obtain comment after assignment."""
|
|
# skip lvalue (or whole of AnnAssign)
|
|
while (tok := self.fetch_token()) and not tok.match([OP, '='], NEWLINE, COMMENT):
|
|
assert tok
|
|
assert tok is not None
|
|
|
|
# skip rvalue (if exists)
|
|
if tok == [OP, '=']:
|
|
self.fetch_rvalue()
|
|
tok = self.current
|
|
assert tok is not None
|
|
|
|
if tok == COMMENT:
|
|
self.comment = tok.value
|
|
|
|
|
|
class VariableCommentPicker(ast.NodeVisitor):
|
|
"""Python source code parser to pick up variable comments."""
|
|
|
|
def __init__(self, buffers: list[str], encoding: str) -> None:
|
|
self.counter = itertools.count()
|
|
self.buffers = buffers
|
|
self.encoding = encoding
|
|
self.context: list[str] = []
|
|
self.current_classes: list[str] = []
|
|
self.current_function: ast.FunctionDef | None = None
|
|
self.comments: dict[tuple[str, str], str] = {}
|
|
self.annotations: dict[tuple[str, str], str] = {}
|
|
self.previous: ast.AST | None = None
|
|
self.deforders: dict[str, int] = {}
|
|
self.finals: list[str] = []
|
|
self.overloads: dict[str, list[Signature]] = {}
|
|
self.typing: str | None = None
|
|
self.typing_final: str | None = None
|
|
self.typing_overload: str | None = None
|
|
super().__init__()
|
|
|
|
def get_qualname_for(self, name: str) -> list[str] | None:
|
|
"""Get qualified name for given object as a list of string(s)."""
|
|
if self.current_function:
|
|
if self.current_classes and self.context[-1] == "__init__":
|
|
# store variable comments inside __init__ method of classes
|
|
return self.context[:-1] + [name]
|
|
else:
|
|
return None
|
|
else:
|
|
return self.context + [name]
|
|
|
|
def add_entry(self, name: str) -> None:
|
|
qualname = self.get_qualname_for(name)
|
|
if qualname:
|
|
self.deforders[".".join(qualname)] = next(self.counter)
|
|
|
|
def add_final_entry(self, name: str) -> None:
|
|
qualname = self.get_qualname_for(name)
|
|
if qualname:
|
|
self.finals.append(".".join(qualname))
|
|
|
|
def add_overload_entry(self, func: ast.FunctionDef) -> None:
|
|
# avoid circular import problem
|
|
from sphinx.util.inspect import signature_from_ast
|
|
qualname = self.get_qualname_for(func.name)
|
|
if qualname:
|
|
overloads = self.overloads.setdefault(".".join(qualname), [])
|
|
overloads.append(signature_from_ast(func))
|
|
|
|
def add_variable_comment(self, name: str, comment: str) -> None:
|
|
qualname = self.get_qualname_for(name)
|
|
if qualname:
|
|
basename = ".".join(qualname[:-1])
|
|
self.comments[(basename, name)] = comment
|
|
|
|
def add_variable_annotation(self, name: str, annotation: ast.AST) -> None:
|
|
qualname = self.get_qualname_for(name)
|
|
if qualname:
|
|
basename = ".".join(qualname[:-1])
|
|
self.annotations[(basename, name)] = ast_unparse(annotation)
|
|
|
|
def is_final(self, decorators: list[ast.expr]) -> bool:
|
|
final = []
|
|
if self.typing:
|
|
final.append('%s.final' % self.typing)
|
|
if self.typing_final:
|
|
final.append(self.typing_final)
|
|
|
|
for decorator in decorators:
|
|
try:
|
|
if ast_unparse(decorator) in final:
|
|
return True
|
|
except NotImplementedError:
|
|
pass
|
|
|
|
return False
|
|
|
|
def is_overload(self, decorators: list[ast.expr]) -> bool:
|
|
overload = []
|
|
if self.typing:
|
|
overload.append('%s.overload' % self.typing)
|
|
if self.typing_overload:
|
|
overload.append(self.typing_overload)
|
|
|
|
for decorator in decorators:
|
|
try:
|
|
if ast_unparse(decorator) in overload:
|
|
return True
|
|
except NotImplementedError:
|
|
pass
|
|
|
|
return False
|
|
|
|
def get_self(self) -> ast.arg | None:
|
|
"""Returns the name of the first argument if in a function."""
|
|
if self.current_function and self.current_function.args.args:
|
|
return self.current_function.args.args[0]
|
|
if self.current_function and self.current_function.args.posonlyargs:
|
|
return self.current_function.args.posonlyargs[0]
|
|
return None
|
|
|
|
def get_line(self, lineno: int) -> str:
|
|
"""Returns specified line."""
|
|
return self.buffers[lineno - 1]
|
|
|
|
def visit(self, node: ast.AST) -> None:
|
|
"""Updates self.previous to the given node."""
|
|
super().visit(node)
|
|
self.previous = node
|
|
|
|
def visit_Import(self, node: ast.Import) -> None:
|
|
"""Handles Import node and record the order of definitions."""
|
|
for name in node.names:
|
|
self.add_entry(name.asname or name.name)
|
|
|
|
if name.name == 'typing':
|
|
self.typing = name.asname or name.name
|
|
elif name.name == 'typing.final':
|
|
self.typing_final = name.asname or name.name
|
|
elif name.name == 'typing.overload':
|
|
self.typing_overload = name.asname or name.name
|
|
|
|
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
|
|
"""Handles Import node and record the order of definitions."""
|
|
for name in node.names:
|
|
self.add_entry(name.asname or name.name)
|
|
|
|
if node.module == 'typing' and name.name == 'final':
|
|
self.typing_final = name.asname or name.name
|
|
elif node.module == 'typing' and name.name == 'overload':
|
|
self.typing_overload = name.asname or name.name
|
|
|
|
def visit_Assign(self, node: ast.Assign) -> None:
|
|
"""Handles Assign node and pick up a variable comment."""
|
|
try:
|
|
targets = get_assign_targets(node)
|
|
varnames: list[str] = sum(
|
|
[get_lvar_names(t, self=self.get_self()) for t in targets], [],
|
|
)
|
|
current_line = self.get_line(node.lineno)
|
|
except TypeError:
|
|
return # this assignment is not new definition!
|
|
|
|
# record annotation
|
|
if hasattr(node, 'annotation') and node.annotation:
|
|
for varname in varnames:
|
|
self.add_variable_annotation(varname, node.annotation)
|
|
elif hasattr(node, 'type_comment') and node.type_comment:
|
|
for varname in varnames:
|
|
self.add_variable_annotation(
|
|
varname, node.type_comment) # type: ignore[arg-type]
|
|
|
|
# check comments after assignment
|
|
parser = AfterCommentParser([current_line[node.col_offset:]] +
|
|
self.buffers[node.lineno:])
|
|
parser.parse()
|
|
if parser.comment and comment_re.match(parser.comment):
|
|
for varname in varnames:
|
|
self.add_variable_comment(varname, comment_re.sub('\\1', parser.comment))
|
|
self.add_entry(varname)
|
|
return
|
|
|
|
# check comments before assignment
|
|
if indent_re.match(current_line[:node.col_offset]):
|
|
comment_lines = []
|
|
for i in range(node.lineno - 1):
|
|
before_line = self.get_line(node.lineno - 1 - i)
|
|
if comment_re.match(before_line):
|
|
comment_lines.append(comment_re.sub('\\1', before_line))
|
|
else:
|
|
break
|
|
|
|
if comment_lines:
|
|
comment = dedent_docstring('\n'.join(reversed(comment_lines)))
|
|
for varname in varnames:
|
|
self.add_variable_comment(varname, comment)
|
|
self.add_entry(varname)
|
|
return
|
|
|
|
# not commented (record deforders only)
|
|
for varname in varnames:
|
|
self.add_entry(varname)
|
|
|
|
def visit_AnnAssign(self, node: ast.AnnAssign) -> None:
|
|
"""Handles AnnAssign node and pick up a variable comment."""
|
|
self.visit_Assign(node) # type: ignore[arg-type]
|
|
|
|
def visit_Expr(self, node: ast.Expr) -> None:
|
|
"""Handles Expr node and pick up a comment if string."""
|
|
if (isinstance(self.previous, (ast.Assign, ast.AnnAssign)) and
|
|
isinstance(node.value, ast.Constant) and isinstance(node.value.value, str)):
|
|
try:
|
|
targets = get_assign_targets(self.previous)
|
|
varnames = get_lvar_names(targets[0], self.get_self())
|
|
for varname in varnames:
|
|
if isinstance(node.value.value, str):
|
|
docstring = node.value.value
|
|
else:
|
|
docstring = node.value.value.decode(self.encoding or 'utf-8')
|
|
|
|
self.add_variable_comment(varname, dedent_docstring(docstring))
|
|
self.add_entry(varname)
|
|
except TypeError:
|
|
pass # this assignment is not new definition!
|
|
|
|
def visit_Try(self, node: ast.Try) -> None:
|
|
"""Handles Try node and processes body and else-clause.
|
|
|
|
.. note:: pycode parser ignores objects definition in except-clause.
|
|
"""
|
|
for subnode in node.body:
|
|
self.visit(subnode)
|
|
for subnode in node.orelse:
|
|
self.visit(subnode)
|
|
|
|
def visit_ClassDef(self, node: ast.ClassDef) -> None:
|
|
"""Handles ClassDef node and set context."""
|
|
self.current_classes.append(node.name)
|
|
self.add_entry(node.name)
|
|
if self.is_final(node.decorator_list):
|
|
self.add_final_entry(node.name)
|
|
self.context.append(node.name)
|
|
self.previous = node
|
|
for child in node.body:
|
|
self.visit(child)
|
|
self.context.pop()
|
|
self.current_classes.pop()
|
|
|
|
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
|
|
"""Handles FunctionDef node and set context."""
|
|
if self.current_function is None:
|
|
self.add_entry(node.name) # should be called before setting self.current_function
|
|
if self.is_final(node.decorator_list):
|
|
self.add_final_entry(node.name)
|
|
if self.is_overload(node.decorator_list):
|
|
self.add_overload_entry(node)
|
|
self.context.append(node.name)
|
|
self.current_function = node
|
|
for child in node.body:
|
|
self.visit(child)
|
|
self.context.pop()
|
|
self.current_function = None
|
|
|
|
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
|
|
"""Handles AsyncFunctionDef node and set context."""
|
|
self.visit_FunctionDef(node) # type: ignore[arg-type]
|
|
|
|
|
|
class DefinitionFinder(TokenProcessor):
|
|
"""Python source code parser to detect location of functions,
|
|
classes and methods.
|
|
"""
|
|
|
|
def __init__(self, lines: list[str]) -> None:
|
|
super().__init__(lines)
|
|
self.decorator: Token | None = None
|
|
self.context: list[str] = []
|
|
self.indents: list[tuple[str, str | None, int | None]] = []
|
|
self.definitions: dict[str, tuple[str, int, int]] = {}
|
|
|
|
def add_definition(self, name: str, entry: tuple[str, int, int]) -> None:
|
|
"""Add a location of definition."""
|
|
if self.indents and self.indents[-1][0] == 'def' and entry[0] == 'def':
|
|
# ignore definition of inner function
|
|
pass
|
|
else:
|
|
self.definitions[name] = entry
|
|
|
|
def parse(self) -> None:
|
|
"""Parse the code to obtain location of definitions."""
|
|
while True:
|
|
token = self.fetch_token()
|
|
if token is None:
|
|
break
|
|
if token == COMMENT:
|
|
pass
|
|
elif token == [OP, '@'] and (self.previous is None or
|
|
self.previous.match(NEWLINE, NL, INDENT, DEDENT)):
|
|
if self.decorator is None:
|
|
self.decorator = token
|
|
elif token.match([NAME, 'class']):
|
|
self.parse_definition('class')
|
|
elif token.match([NAME, 'def']):
|
|
self.parse_definition('def')
|
|
elif token == INDENT:
|
|
self.indents.append(('other', None, None))
|
|
elif token == DEDENT:
|
|
self.finalize_block()
|
|
|
|
def parse_definition(self, typ: str) -> None:
|
|
"""Parse AST of definition."""
|
|
name = self.fetch_token()
|
|
self.context.append(name.value) # type: ignore[union-attr]
|
|
funcname = '.'.join(self.context)
|
|
|
|
if self.decorator:
|
|
start_pos = self.decorator.start[0]
|
|
self.decorator = None
|
|
else:
|
|
start_pos = name.start[0] # type: ignore[union-attr]
|
|
|
|
self.fetch_until([OP, ':'])
|
|
if self.fetch_token().match(COMMENT, NEWLINE): # type: ignore[union-attr]
|
|
self.fetch_until(INDENT)
|
|
self.indents.append((typ, funcname, start_pos))
|
|
else:
|
|
# one-liner
|
|
self.add_definition(funcname,
|
|
(typ, start_pos, name.end[0])) # type: ignore[union-attr]
|
|
self.context.pop()
|
|
|
|
def finalize_block(self) -> None:
|
|
"""Finalize definition block."""
|
|
definition = self.indents.pop()
|
|
if definition[0] != 'other':
|
|
typ, funcname, start_pos = definition
|
|
end_pos = self.current.end[0] - 1 # type: ignore[union-attr]
|
|
while emptyline_re.match(self.get_line(end_pos)):
|
|
end_pos -= 1
|
|
|
|
self.add_definition(funcname, (typ, start_pos, end_pos)) # type: ignore[arg-type]
|
|
self.context.pop()
|
|
|
|
|
|
class Parser:
|
|
"""Python source code parser to pick up variable comments.
|
|
|
|
This is a better wrapper for ``VariableCommentPicker``.
|
|
"""
|
|
|
|
def __init__(self, code: str, encoding: str = 'utf-8') -> None:
|
|
self.code = filter_whitespace(code)
|
|
self.encoding = encoding
|
|
self.annotations: dict[tuple[str, str], str] = {}
|
|
self.comments: dict[tuple[str, str], str] = {}
|
|
self.deforders: dict[str, int] = {}
|
|
self.definitions: dict[str, tuple[str, int, int]] = {}
|
|
self.finals: list[str] = []
|
|
self.overloads: dict[str, list[Signature]] = {}
|
|
|
|
def parse(self) -> None:
|
|
"""Parse the source code."""
|
|
self.parse_comments()
|
|
self.parse_definition()
|
|
|
|
def parse_comments(self) -> None:
|
|
"""Parse the code and pick up comments."""
|
|
tree = ast.parse(self.code, type_comments=True)
|
|
picker = VariableCommentPicker(self.code.splitlines(True), self.encoding)
|
|
picker.visit(tree)
|
|
self.annotations = picker.annotations
|
|
self.comments = picker.comments
|
|
self.deforders = picker.deforders
|
|
self.finals = picker.finals
|
|
self.overloads = picker.overloads
|
|
|
|
def parse_definition(self) -> None:
|
|
"""Parse the location of definitions from the code."""
|
|
parser = DefinitionFinder(self.code.splitlines(True))
|
|
parser.parse()
|
|
self.definitions = parser.definitions
|