diff --git a/coverage/sqldata.py b/coverage/sqldata.py index 77577437e..12676d0bb 100644 --- a/coverage/sqldata.py +++ b/coverage/sqldata.py @@ -615,6 +615,42 @@ def touch_files(self, filenames: Collection[str], plugin_name: Optional[str] = N # Set the tracer for this file self.add_file_tracers({filename: plugin_name}) + def purge_files(self, filenames: Iterable[str], context: Optional[str] = None) -> None: + """Purge any existing coverage data for the given `filenames`. + + If `context` is given, purge only data associated with that measurement context. + """ + + if self._debug.should("dataop"): + self._debug.write(f"Purging {filenames!r} for context {context}") + self._start_using() + with self._connect() as con: + + if context is not None: + context_id = self._context_id(context) + if context_id is None: + raise DataError("Unknown context {context}") + else: + context_id = None + + if self._has_lines: + table = 'line_bits' + elif self._has_arcs: + table = 'arcs' + else: + return + + for filename in filenames: + file_id = self._file_id(filename, add=False) + if file_id is None: + continue + self._file_map.pop(filename, None) + if context_id is None: + q = f'delete from {table} where file_id={file_id}' + else: + q = f'delete from {table} where file_id={file_id} and context_id={context_id}' + con.execute(q) + def update(self, other_data: CoverageData, aliases: Optional[PathAliases] = None) -> None: """Update this data with data from several other :class:`CoverageData` instances. diff --git a/tests/test_api.py b/tests/test_api.py index 1c5654216..885831550 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -754,6 +754,112 @@ def test_run_debug_sys(self) -> None: cov.stop() # pragma: nested assert cast(str, d['data_file']).endswith(".coverage") + def test_purge_filenames(self) -> None: + + fn1 = self.make_file("mymain.py", """\ + import mymod + a = 1 + """) + fn1 = os.path.join(self.temp_dir, fn1) + + fn2 = self.make_file("mymod.py", """\ + fooey = 17 + """) + fn2 = os.path.join(self.temp_dir, fn2) + + cov = coverage.Coverage() + self.start_import_stop(cov, "mymain") + + data = cov.get_data() + + # Initial measurement was for two files + assert len(data.measured_files()) == 2 + assert [1, 2] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + + # Purge one file's data and one should remain + data.purge_files([fn1]) + assert len(data.measured_files()) == 1 + assert [] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + + # Purge second file's data and none should remain + data.purge_files([fn2]) + assert len(data.measured_files()) == 0 + assert [] == sorted_lines(data, fn1) + assert [] == sorted_lines(data, fn2) + + def test_purge_filenames_context(self) -> None: + + fn1 = self.make_file("mymain.py", """\ + import mymod + a = 1 + """) + fn1 = os.path.join(self.temp_dir, fn1) + + fn2 = self.make_file("mymod.py", """\ + fooey = 17 + """) + fn2 = os.path.join(self.temp_dir, fn2) + + def dummy_function() -> None: + unused = 42 + + # Start/stop since otherwise cantext + cov = coverage.Coverage() + cov.start() + cov.switch_context('initialcontext') + dummy_function() + cov.switch_context('testcontext') + cov.stop() + self.start_import_stop(cov, "mymain") + + data = cov.get_data() + + # Initial measurement was for three files and two contexts + assert len(data.measured_files()) == 3 + assert [1, 2] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + assert len(sorted_lines(data, __file__)) == 1 + assert len(data.measured_contexts()) == 2 + + # Remove specifying wrong context should raise exception and not remove anything + try: + data.purge_files([fn1], 'wrongcontext') + except coverage.sqldata.DataError: + pass + else: + assert 0, "exception expected" + assert len(data.measured_files()) == 3 + assert [1, 2] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + assert len(sorted_lines(data, __file__)) == 1 + assert len(data.measured_contexts()) == 2 + + # Remove one file specifying correct context + data.purge_files([fn1], 'testcontext') + assert len(data.measured_files()) == 2 + assert [] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + assert len(sorted_lines(data, __file__)) == 1 + assert len(data.measured_contexts()) == 2 + + # Remove second file with other correct context + data.purge_files([__file__], 'initialcontext') + assert len(data.measured_files()) == 1 + assert [] == sorted_lines(data, fn1) + assert [1,] == sorted_lines(data, fn2) + assert len(sorted_lines(data, __file__)) == 0 + assert len(data.measured_contexts()) == 2 + + # Remove last file specifying correct context + data.purge_files([fn2], 'testcontext') + assert len(data.measured_files()) == 0 + assert [] == sorted_lines(data, fn1) + assert [] == sorted_lines(data, fn2) + assert len(sorted_lines(data, __file__)) == 0 + assert len(data.measured_contexts()) == 2 + class CurrentInstanceTest(CoverageTest): """Tests of Coverage.current()."""