Skip to content

typing: resultlog, pytester, longrepr #7601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/_pytest/junitxml.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from _pytest import nodes
from _pytest import timing
from _pytest._code.code import ExceptionRepr
from _pytest._code.code import ReprFileLocation
from _pytest.config import Config
from _pytest.config import filename_arg
from _pytest.config.argparsing import Parser
Expand Down Expand Up @@ -200,8 +201,11 @@ def append_failure(self, report: TestReport) -> None:
self._add_simple("skipped", "xfail-marked test passes unexpectedly")
else:
assert report.longrepr is not None
if getattr(report.longrepr, "reprcrash", None) is not None:
message = report.longrepr.reprcrash.message
reprcrash = getattr(
report.longrepr, "reprcrash", None
) # type: Optional[ReprFileLocation]
if reprcrash is not None:
message = reprcrash.message
else:
message = str(report.longrepr)
message = bin_xml_escape(message)
Expand All @@ -217,8 +221,11 @@ def append_collect_skipped(self, report: TestReport) -> None:

def append_error(self, report: TestReport) -> None:
assert report.longrepr is not None
if getattr(report.longrepr, "reprcrash", None) is not None:
reason = report.longrepr.reprcrash.message
reprcrash = getattr(
report.longrepr, "reprcrash", None
) # type: Optional[ReprFileLocation]
if reprcrash is not None:
reason = reprcrash.message
else:
reason = str(report.longrepr)

Expand All @@ -237,7 +244,7 @@ def append_skipped(self, report: TestReport) -> None:
skipped = ET.Element("skipped", type="pytest.xfail", message=xfailreason)
self.append(skipped)
else:
assert report.longrepr is not None
assert isinstance(report.longrepr, tuple)
filename, lineno, skipreason = report.longrepr
if skipreason.startswith("Skipped: "):
skipreason = skipreason[9:]
Expand Down
111 changes: 86 additions & 25 deletions src/_pytest/pytester.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from _pytest import timing
from _pytest._code import Source
from _pytest.capture import _get_multicapture
from _pytest.compat import overload
from _pytest.compat import TYPE_CHECKING
from _pytest.config import _PluggyPlugin
from _pytest.config import Config
Expand All @@ -42,11 +43,13 @@
from _pytest.pathlib import make_numbered_dir
from _pytest.pathlib import Path
from _pytest.python import Module
from _pytest.reports import CollectReport
from _pytest.reports import TestReport
from _pytest.tmpdir import TempdirFactory

if TYPE_CHECKING:
from typing import Type
from typing_extensions import Literal

import pexpect

Expand Down Expand Up @@ -180,24 +183,24 @@ def gethookrecorder(self, hook) -> "HookRecorder":
return hookrecorder


def get_public_names(values):
def get_public_names(values: Iterable[str]) -> List[str]:
"""Only return names from iterator values without a leading underscore."""
return [x for x in values if x[0] != "_"]


class ParsedCall:
def __init__(self, name, kwargs):
def __init__(self, name: str, kwargs) -> None:
self.__dict__.update(kwargs)
self._name = name

def __repr__(self):
def __repr__(self) -> str:
d = self.__dict__.copy()
del d["_name"]
return "<ParsedCall {!r}(**{!r})>".format(self._name, d)

if TYPE_CHECKING:
# The class has undetermined attributes, this tells mypy about it.
def __getattr__(self, key):
def __getattr__(self, key: str):
raise NotImplementedError()


Expand All @@ -211,6 +214,7 @@ class HookRecorder:
def __init__(self, pluginmanager: PytestPluginManager) -> None:
self._pluginmanager = pluginmanager
self.calls = [] # type: List[ParsedCall]
self.ret = None # type: Optional[Union[int, ExitCode]]

def before(hook_name: str, hook_impls, kwargs) -> None:
self.calls.append(ParsedCall(hook_name, kwargs))
Expand All @@ -228,7 +232,7 @@ def getcalls(self, names: Union[str, Iterable[str]]) -> List[ParsedCall]:
names = names.split()
return [call for call in self.calls if call._name in names]

def assert_contains(self, entries) -> None:
def assert_contains(self, entries: Sequence[Tuple[str, str]]) -> None:
__tracebackhide__ = True
i = 0
entries = list(entries)
Expand Down Expand Up @@ -266,22 +270,46 @@ def getcall(self, name: str) -> ParsedCall:

# functionality for test reports

@overload
def getreports(
self, names: "Literal['pytest_collectreport']",
) -> Sequence[CollectReport]:
raise NotImplementedError()

@overload # noqa: F811
def getreports( # noqa: F811
self, names: "Literal['pytest_runtest_logreport']",
) -> Sequence[TestReport]:
raise NotImplementedError()

@overload # noqa: F811
def getreports( # noqa: F811
self,
names: Union[str, Iterable[str]] = (
"pytest_collectreport",
"pytest_runtest_logreport",
),
) -> Sequence[Union[CollectReport, TestReport]]:
raise NotImplementedError()

def getreports( # noqa: F811
self,
names: Union[
str, Iterable[str]
] = "pytest_runtest_logreport pytest_collectreport",
) -> List[TestReport]:
names: Union[str, Iterable[str]] = (
"pytest_collectreport",
"pytest_runtest_logreport",
),
) -> Sequence[Union[CollectReport, TestReport]]:
return [x.report for x in self.getcalls(names)]

def matchreport(
self,
inamepart: str = "",
names: Union[
str, Iterable[str]
] = "pytest_runtest_logreport pytest_collectreport",
when=None,
):
names: Union[str, Iterable[str]] = (
"pytest_runtest_logreport",
"pytest_collectreport",
),
when: Optional[str] = None,
) -> Union[CollectReport, TestReport]:
"""Return a testreport whose dotted import path matches."""
values = []
for rep in self.getreports(names=names):
Expand All @@ -305,26 +333,56 @@ def matchreport(
)
return values[0]

@overload
def getfailures(
self, names: "Literal['pytest_collectreport']",
) -> Sequence[CollectReport]:
raise NotImplementedError()

@overload # noqa: F811
def getfailures( # noqa: F811
self, names: "Literal['pytest_runtest_logreport']",
) -> Sequence[TestReport]:
raise NotImplementedError()

@overload # noqa: F811
def getfailures( # noqa: F811
self,
names: Union[
str, Iterable[str]
] = "pytest_runtest_logreport pytest_collectreport",
) -> List[TestReport]:
names: Union[str, Iterable[str]] = (
"pytest_collectreport",
"pytest_runtest_logreport",
),
) -> Sequence[Union[CollectReport, TestReport]]:
raise NotImplementedError()

def getfailures( # noqa: F811
self,
names: Union[str, Iterable[str]] = (
"pytest_collectreport",
"pytest_runtest_logreport",
),
) -> Sequence[Union[CollectReport, TestReport]]:
return [rep for rep in self.getreports(names) if rep.failed]

def getfailedcollections(self) -> List[TestReport]:
def getfailedcollections(self) -> Sequence[CollectReport]:
return self.getfailures("pytest_collectreport")

def listoutcomes(
self,
) -> Tuple[List[TestReport], List[TestReport], List[TestReport]]:
) -> Tuple[
Sequence[TestReport],
Sequence[Union[CollectReport, TestReport]],
Sequence[Union[CollectReport, TestReport]],
]:
passed = []
skipped = []
failed = []
for rep in self.getreports("pytest_collectreport pytest_runtest_logreport"):
for rep in self.getreports(
("pytest_collectreport", "pytest_runtest_logreport")
):
if rep.passed:
if rep.when == "call":
assert isinstance(rep, TestReport)
passed.append(rep)
elif rep.skipped:
skipped.append(rep)
Expand Down Expand Up @@ -879,7 +937,7 @@ def runitem(self, source):
runner = testclassinstance.getrunner()
return runner(item)

def inline_runsource(self, source, *cmdlineargs):
def inline_runsource(self, source, *cmdlineargs) -> HookRecorder:
"""Run a test module in process using ``pytest.main()``.

This run writes "source" into a temporary file and runs
Expand All @@ -896,7 +954,7 @@ def inline_runsource(self, source, *cmdlineargs):
values = list(cmdlineargs) + [p]
return self.inline_run(*values)

def inline_genitems(self, *args):
def inline_genitems(self, *args) -> Tuple[List[Item], HookRecorder]:
"""Run ``pytest.main(['--collectonly'])`` in-process.

Runs the :py:func:`pytest.main` function to run all of pytest inside
Expand All @@ -907,7 +965,9 @@ def inline_genitems(self, *args):
items = [x.item for x in rec.getcalls("pytest_itemcollected")]
return items, rec

def inline_run(self, *args, plugins=(), no_reraise_ctrlc: bool = False):
def inline_run(
self, *args, plugins=(), no_reraise_ctrlc: bool = False
) -> HookRecorder:
"""Run ``pytest.main()`` in-process, returning a HookRecorder.

Runs the :py:func:`pytest.main` function to run all of pytest inside
Expand Down Expand Up @@ -962,7 +1022,7 @@ def pytest_configure(x, config: Config) -> None:
class reprec: # type: ignore
pass

reprec.ret = ret # type: ignore[attr-defined]
reprec.ret = ret

# Typically we reraise keyboard interrupts from the child run
# because it's our user requesting interruption of the testing.
Expand Down Expand Up @@ -1010,6 +1070,7 @@ class reprec: # type: ignore
sys.stdout.write(out)
sys.stderr.write(err)

assert reprec.ret is not None
res = RunResult(
reprec.ret, out.splitlines(), err.splitlines(), timing.time() - now
)
Expand Down
37 changes: 23 additions & 14 deletions src/_pytest/reports.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from io import StringIO
from pprint import pprint
from typing import Any
from typing import cast
from typing import Dict
from typing import Iterable
from typing import Iterator
Expand All @@ -15,6 +16,7 @@

from _pytest._code.code import ExceptionChainRepr
from _pytest._code.code import ExceptionInfo
from _pytest._code.code import ExceptionRepr
from _pytest._code.code import ReprEntry
from _pytest._code.code import ReprEntryNative
from _pytest._code.code import ReprExceptionInfo
Expand Down Expand Up @@ -57,8 +59,9 @@ def getworkerinfoline(node):
class BaseReport:
when = None # type: Optional[str]
location = None # type: Optional[Tuple[str, Optional[int], str]]
# TODO: Improve this Any.
longrepr = None # type: Optional[Any]
longrepr = (
None
) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
sections = [] # type: List[Tuple[str, str]]
nodeid = None # type: str

Expand All @@ -79,7 +82,8 @@ def toterminal(self, out: TerminalWriter) -> None:
return

if hasattr(longrepr, "toterminal"):
longrepr.toterminal(out)
longrepr_terminal = cast(TerminalRepr, longrepr)
longrepr_terminal.toterminal(out)
else:
try:
s = str(longrepr)
Expand Down Expand Up @@ -233,7 +237,9 @@ def __init__(
location: Tuple[str, Optional[int], str],
keywords,
outcome: "Literal['passed', 'failed', 'skipped']",
longrepr,
longrepr: Union[
None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr
],
when: "Literal['setup', 'call', 'teardown']",
sections: Iterable[Tuple[str, str]] = (),
duration: float = 0,
Expand Down Expand Up @@ -293,8 +299,9 @@ def from_item_and_call(cls, item: Item, call: "CallInfo[None]") -> "TestReport":
sections = []
if not call.excinfo:
outcome = "passed" # type: Literal["passed", "failed", "skipped"]
# TODO: Improve this Any.
longrepr = None # type: Optional[Any]
longrepr = (
None
) # type: Union[None, ExceptionInfo[BaseException], Tuple[str, int, str], str, TerminalRepr]
else:
if not isinstance(excinfo, ExceptionInfo):
outcome = "failed"
Expand Down Expand Up @@ -372,7 +379,7 @@ def __repr__(self) -> str:


class CollectErrorRepr(TerminalRepr):
def __init__(self, msg) -> None:
def __init__(self, msg: str) -> None:
self.longrepr = msg

def toterminal(self, out: TerminalWriter) -> None:
Expand Down Expand Up @@ -436,16 +443,18 @@ def serialize_repr_crash(
else:
return None

def serialize_longrepr(rep: BaseReport) -> Dict[str, Any]:
def serialize_exception_longrepr(rep: BaseReport) -> Dict[str, Any]:
assert rep.longrepr is not None
# TODO: Investigate whether the duck typing is really necessary here.
longrepr = cast(ExceptionRepr, rep.longrepr)
result = {
"reprcrash": serialize_repr_crash(rep.longrepr.reprcrash),
"reprtraceback": serialize_repr_traceback(rep.longrepr.reprtraceback),
"sections": rep.longrepr.sections,
"reprcrash": serialize_repr_crash(longrepr.reprcrash),
"reprtraceback": serialize_repr_traceback(longrepr.reprtraceback),
"sections": longrepr.sections,
} # type: Dict[str, Any]
if isinstance(rep.longrepr, ExceptionChainRepr):
if isinstance(longrepr, ExceptionChainRepr):
result["chain"] = []
for repr_traceback, repr_crash, description in rep.longrepr.chain:
for repr_traceback, repr_crash, description in longrepr.chain:
result["chain"].append(
(
serialize_repr_traceback(repr_traceback),
Expand All @@ -462,7 +471,7 @@ def serialize_longrepr(rep: BaseReport) -> Dict[str, Any]:
if hasattr(report.longrepr, "reprtraceback") and hasattr(
report.longrepr, "reprcrash"
):
d["longrepr"] = serialize_longrepr(report)
d["longrepr"] = serialize_exception_longrepr(report)
else:
d["longrepr"] = str(report.longrepr)
else:
Expand Down
Loading