Skip to content

[3.13] gh-111495: Add PyFile tests (#129449) #129477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 234 additions & 0 deletions Lib/test/test_capi/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
import io
import os
import unittest
import warnings
from test import support
from test.support import import_helper, os_helper, warnings_helper


_testcapi = import_helper.import_module('_testcapi')
_testlimitedcapi = import_helper.import_module('_testlimitedcapi')
_io = import_helper.import_module('_io')
NULL = None
STDOUT_FD = 1

with open(__file__, 'rb') as fp:
FIRST_LINE = next(fp).decode()
FIRST_LINE_NORM = FIRST_LINE.rstrip() + '\n'


class CAPIFileTest(unittest.TestCase):
def test_pyfile_fromfd(self):
# Test PyFile_FromFd() which is a thin wrapper to _io.open()
pyfile_fromfd = _testlimitedcapi.pyfile_fromfd
filename = __file__
with open(filename, "rb") as fp:
fd = fp.fileno()

# FileIO
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "rb", 0, NULL, NULL, NULL, 0)
try:
self.assertIsInstance(obj, _io.FileIO)
self.assertEqual(obj.readline(), FIRST_LINE.encode())
finally:
obj.close()

# BufferedReader
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "rb", 1024, NULL, NULL, NULL, 0)
try:
self.assertIsInstance(obj, _io.BufferedReader)
self.assertEqual(obj.readline(), FIRST_LINE.encode())
finally:
obj.close()

# TextIOWrapper
fp.seek(0)
obj = pyfile_fromfd(fd, filename, "r", 1,
"utf-8", "replace", NULL, 0)
try:
self.assertIsInstance(obj, _io.TextIOWrapper)
self.assertEqual(obj.encoding, "utf-8")
self.assertEqual(obj.errors, "replace")
self.assertEqual(obj.readline(), FIRST_LINE_NORM)
finally:
obj.close()

def test_pyfile_getline(self):
# Test PyFile_GetLine(file, n): call file.readline()
# and strip "\n" suffix if n < 0.
pyfile_getline = _testlimitedcapi.pyfile_getline

# Test Unicode
with open(__file__, "r") as fp:
fp.seek(0)
self.assertEqual(pyfile_getline(fp, -1),
FIRST_LINE_NORM.rstrip('\n'))
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 0),
FIRST_LINE_NORM)
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 6),
FIRST_LINE_NORM[:6])

# Test bytes
with open(__file__, "rb") as fp:
fp.seek(0)
self.assertEqual(pyfile_getline(fp, -1),
FIRST_LINE.rstrip('\n').encode())
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 0),
FIRST_LINE.encode())
fp.seek(0)
self.assertEqual(pyfile_getline(fp, 6),
FIRST_LINE.encode()[:6])

def test_pyfile_writestring(self):
# Test PyFile_WriteString(str, file): call file.write(str)
writestr = _testlimitedcapi.pyfile_writestring

with io.StringIO() as fp:
self.assertEqual(writestr("a\xe9\u20ac\U0010FFFF".encode(), fp), 0)
with self.assertRaises(UnicodeDecodeError):
writestr(b"\xff", fp)
with self.assertRaises(UnicodeDecodeError):
writestr("\udc80".encode("utf-8", "surrogatepass"), fp)

text = fp.getvalue()
self.assertEqual(text, "a\xe9\u20ac\U0010FFFF")

with self.assertRaises(SystemError):
writestr(b"abc", NULL)

def test_pyfile_writeobject(self):
# Test PyFile_WriteObject(obj, file, flags):
# - Call file.write(str(obj)) if flags equals Py_PRINT_RAW.
# - Call file.write(repr(obj)) otherwise.
writeobject = _testlimitedcapi.pyfile_writeobject
Py_PRINT_RAW = 1

with io.StringIO() as fp:
# Test flags=Py_PRINT_RAW
self.assertEqual(writeobject("raw", fp, Py_PRINT_RAW), 0)
writeobject(NULL, fp, Py_PRINT_RAW)

# Test flags=0
self.assertEqual(writeobject("repr", fp, 0), 0)
writeobject(NULL, fp, 0)

text = fp.getvalue()
self.assertEqual(text, "raw<NULL>'repr'<NULL>")

# invalid file type
for invalid_file in (123, "abc", object()):
with self.subTest(file=invalid_file):
with self.assertRaises(AttributeError):
writeobject("abc", invalid_file, Py_PRINT_RAW)

with self.assertRaises(TypeError):
writeobject("abc", NULL, 0)

def test_pyobject_asfiledescriptor(self):
# Test PyObject_AsFileDescriptor(obj):
# - Return obj if obj is an integer.
# - Return obj.fileno() otherwise.
# File descriptor must be >= 0.
asfd = _testlimitedcapi.pyobject_asfiledescriptor

self.assertEqual(asfd(123), 123)
self.assertEqual(asfd(0), 0)

with open(__file__, "rb") as fp:
self.assertEqual(asfd(fp), fp.fileno())

# bool emits RuntimeWarning
msg = r"bool is used as a file descriptor"
with warnings_helper.check_warnings((msg, RuntimeWarning)):
self.assertEqual(asfd(True), 1)

class FakeFile:
def __init__(self, fd):
self.fd = fd
def fileno(self):
return self.fd

# file descriptor must be positive
with self.assertRaises(ValueError):
asfd(-1)
with self.assertRaises(ValueError):
asfd(FakeFile(-1))

# fileno() result must be an integer
with self.assertRaises(TypeError):
asfd(FakeFile("text"))

# unsupported types
for obj in ("string", ["list"], object()):
with self.subTest(obj=obj):
with self.assertRaises(TypeError):
asfd(obj)

# CRASHES asfd(NULL)

def test_pyfile_newstdprinter(self):
# Test PyFile_NewStdPrinter()
pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

file = pyfile_newstdprinter(STDOUT_FD)
self.assertEqual(file.closed, False)
self.assertIsNone(file.encoding)
self.assertEqual(file.mode, "w")

self.assertEqual(file.fileno(), STDOUT_FD)
self.assertEqual(file.isatty(), os.isatty(STDOUT_FD))

# flush() is a no-op
self.assertIsNone(file.flush())

# close() is a no-op
self.assertIsNone(file.close())
self.assertEqual(file.closed, False)

support.check_disallow_instantiation(self, type(file))

def test_pyfile_newstdprinter_write(self):
# Test the write() method of PyFile_NewStdPrinter()
pyfile_newstdprinter = _testcapi.pyfile_newstdprinter

filename = os_helper.TESTFN
self.addCleanup(os_helper.unlink, filename)

try:
old_stdout = os.dup(STDOUT_FD)
except OSError as exc:
# os.dup(STDOUT_FD) is not supported on WASI
self.skipTest(f"os.dup() failed with {exc!r}")

try:
with open(filename, "wb") as fp:
# PyFile_NewStdPrinter() only accepts fileno(stdout)
# or fileno(stderr) file descriptor.
fd = fp.fileno()
os.dup2(fd, STDOUT_FD)

file = pyfile_newstdprinter(STDOUT_FD)
self.assertEqual(file.write("text"), 4)
# The surrogate character is encoded with
# the "surrogateescape" error handler
self.assertEqual(file.write("[\udc80]"), 8)
finally:
os.dup2(old_stdout, STDOUT_FD)
os.close(old_stdout)

with open(filename, "r") as fp:
self.assertEqual(fp.read(), "text[\\udc80]")

# TODO: Test Py_UniversalNewlineFgets()

# PyFile_SetOpenCodeHook() and PyFile_OpenCode() are tested by
# test_embed.test_open_code_hook()


if __name__ == "__main__":
unittest.main()
51 changes: 0 additions & 51 deletions Lib/test/test_embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -1955,56 +1955,5 @@ def test_presite(self):
self.assertIn("cmd", out)


class StdPrinterTests(EmbeddingTestsMixin, unittest.TestCase):
# Test PyStdPrinter_Type which is used by _PySys_SetPreliminaryStderr():
# "Set up a preliminary stderr printer until we have enough
# infrastructure for the io module in place."

STDOUT_FD = 1

def create_printer(self, fd):
ctypes = import_helper.import_module('ctypes')
PyFile_NewStdPrinter = ctypes.pythonapi.PyFile_NewStdPrinter
PyFile_NewStdPrinter.argtypes = (ctypes.c_int,)
PyFile_NewStdPrinter.restype = ctypes.py_object
return PyFile_NewStdPrinter(fd)

def test_write(self):
message = "unicode:\xe9-\u20ac-\udc80!\n"

stdout_fd = self.STDOUT_FD
stdout_fd_copy = os.dup(stdout_fd)
self.addCleanup(os.close, stdout_fd_copy)

rfd, wfd = os.pipe()
self.addCleanup(os.close, rfd)
self.addCleanup(os.close, wfd)
try:
# PyFile_NewStdPrinter() only accepts fileno(stdout)
# or fileno(stderr) file descriptor.
os.dup2(wfd, stdout_fd)

printer = self.create_printer(stdout_fd)
printer.write(message)
finally:
os.dup2(stdout_fd_copy, stdout_fd)

data = os.read(rfd, 100)
self.assertEqual(data, message.encode('utf8', 'backslashreplace'))

def test_methods(self):
fd = self.STDOUT_FD
printer = self.create_printer(fd)
self.assertEqual(printer.fileno(), fd)
self.assertEqual(printer.isatty(), os.isatty(fd))
printer.flush() # noop
printer.close() # noop

def test_disallow_instantiation(self):
fd = self.STDOUT_FD
printer = self.create_printer(fd)
support.check_disallow_instantiation(self, type(printer))


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion Modules/Setup.stdlib.in
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
@MODULE__TESTBUFFER_TRUE@_testbuffer _testbuffer.c
@MODULE__TESTINTERNALCAPI_TRUE@_testinternalcapi _testinternalcapi.c _testinternalcapi/test_lock.c _testinternalcapi/pytime.c _testinternalcapi/set.c _testinternalcapi/test_critical_sections.c
@MODULE__TESTCAPI_TRUE@_testcapi _testcapimodule.c _testcapi/vectorcall.c _testcapi/heaptype.c _testcapi/abstract.c _testcapi/unicode.c _testcapi/dict.c _testcapi/set.c _testcapi/list.c _testcapi/tuple.c _testcapi/getargs.c _testcapi/datetime.c _testcapi/docstring.c _testcapi/mem.c _testcapi/watchers.c _testcapi/long.c _testcapi/float.c _testcapi/complex.c _testcapi/numbers.c _testcapi/structmember.c _testcapi/exceptions.c _testcapi/code.c _testcapi/buffer.c _testcapi/pyatomic.c _testcapi/run.c _testcapi/file.c _testcapi/codec.c _testcapi/immortal.c _testcapi/gc.c _testcapi/hash.c _testcapi/time.c _testcapi/bytes.c _testcapi/object.c _testcapi/monitoring.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/import.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c
@MODULE__TESTLIMITEDCAPI_TRUE@_testlimitedcapi _testlimitedcapi.c _testlimitedcapi/abstract.c _testlimitedcapi/bytearray.c _testlimitedcapi/bytes.c _testlimitedcapi/complex.c _testlimitedcapi/dict.c _testlimitedcapi/eval.c _testlimitedcapi/float.c _testlimitedcapi/heaptype_relative.c _testlimitedcapi/import.c _testlimitedcapi/list.c _testlimitedcapi/long.c _testlimitedcapi/object.c _testlimitedcapi/pyos.c _testlimitedcapi/set.c _testlimitedcapi/sys.c _testlimitedcapi/tuple.c _testlimitedcapi/unicode.c _testlimitedcapi/vectorcall_limited.c _testlimitedcapi/file.c
@MODULE__TESTCLINIC_TRUE@_testclinic _testclinic.c
@MODULE__TESTCLINIC_LIMITED_TRUE@_testclinic_limited _testclinic_limited.c

Expand Down
31 changes: 31 additions & 0 deletions Modules/_testcapi/clinic/file.c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 25 additions & 5 deletions Modules/_testcapi/file.c
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
#include "parts.h"
#include "util.h"
#include "clinic/file.c.h"


/*[clinic input]
module _testcapi
[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6361033e795369fc]*/


/*[clinic input]
_testcapi.pyfile_newstdprinter

fd: int
/

[clinic start generated code]*/

static PyObject *
_testcapi_pyfile_newstdprinter_impl(PyObject *module, int fd)
/*[clinic end generated code: output=8a2d1c57b6892db3 input=442f1824142262ea]*/
{
return PyFile_NewStdPrinter(fd);
}


static PyMethodDef test_methods[] = {
_TESTCAPI_PYFILE_NEWSTDPRINTER_METHODDEF
{NULL},
};

int
_PyTestCapi_Init_File(PyObject *m)
{
if (PyModule_AddFunctions(m, test_methods) < 0){
return -1;
}

return 0;
return PyModule_AddFunctions(m, test_methods);
}
3 changes: 3 additions & 0 deletions Modules/_testlimitedcapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ PyInit__testlimitedcapi(void)
if (_PyTestLimitedCAPI_Init_VectorcallLimited(mod) < 0) {
return NULL;
}
if (_PyTestLimitedCAPI_Init_File(mod) < 0) {
return NULL;
}
return mod;
}
Loading
Loading