Skip to content

Commit 737ec34

Browse files
authored
REFACTOR-#2343: refactor offset, _read_rows, partitioned_file (#2344)
Signed-off-by: Anatoly Myachev <[email protected]>
1 parent a11e7c9 commit 737ec34

File tree

3 files changed

+120
-162
lines changed

3 files changed

+120
-162
lines changed

modin/engines/base/io/text/csv_reader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,6 @@ def _read(cls, filepath_or_buffer, **kwargs):
120120
skiprows += header + 1
121121
elif hasattr(header, "__iter__") and not isinstance(header, str):
122122
skiprows += max(header) + 1
123-
cls.offset(
124-
f,
125-
nrows=skiprows,
126-
quotechar=quotechar,
127-
is_quoting=is_quoting,
128-
)
129123
if kwargs.get("encoding", None) is not None:
130124
partition_kwargs["skiprows"] = 1
131125
# Launch tasks to read partitions
@@ -163,8 +157,9 @@ def _read(cls, filepath_or_buffer, **kwargs):
163157

164158
splits = cls.partitioned_file(
165159
f,
166-
nrows=nrows,
167160
num_partitions=num_partitions,
161+
nrows=nrows,
162+
skiprows=skiprows,
168163
quotechar=quotechar,
169164
is_quoting=is_quoting,
170165
)

modin/engines/base/io/text/fwf_reader.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,6 @@ def read(cls, filepath_or_buffer, **kwargs):
116116
skiprows += header + 1
117117
elif hasattr(header, "__iter__") and not isinstance(header, str):
118118
skiprows += max(header) + 1
119-
cls.offset(
120-
f,
121-
nrows=skiprows,
122-
quotechar=quotechar,
123-
is_quoting=is_quoting,
124-
)
125119
if kwargs.get("encoding", None) is not None:
126120
partition_kwargs["skiprows"] = 1
127121
# Launch tasks to read partitions
@@ -159,8 +153,9 @@ def read(cls, filepath_or_buffer, **kwargs):
159153

160154
splits = cls.partitioned_file(
161155
f,
162-
nrows=nrows,
163156
num_partitions=num_partitions,
157+
nrows=nrows,
158+
skiprows=skiprows,
164159
quotechar=quotechar,
165160
is_quoting=is_quoting,
166161
)

modin/engines/base/io/text/text_file_reader.py

Lines changed: 116 additions & 148 deletions
Original file line numberDiff line numberDiff line change
@@ -56,219 +56,187 @@ def pathlib_or_pypath(cls, filepath_or_buffer):
5656
def offset(
5757
cls,
5858
f,
59-
nrows=None,
60-
skiprows=None,
61-
chunk_size_bytes=None,
62-
quotechar=b'"',
63-
is_quoting=True,
59+
offset_size: int,
60+
quotechar: bytes = b'"',
61+
is_quoting: bool = True,
6462
):
6563
"""
66-
Moves the file offset at the specified amount of bytes/rows.
64+
Moves the file offset at the specified amount of bytes.
6765
6866
Parameters
6967
----------
70-
f: file object
71-
nrows: int, number of rows to read. Optional, if not specified will only
72-
consider `chunk_size_bytes` parameter.
73-
chunk_size_bytes: int, Will read new rows while file pointer
74-
is less than `chunk_size_bytes`. Optional, if not specified will only
75-
consider `nrows` parameter.
76-
skiprows: array or callable (optional), specifies rows to skip
77-
quotechar: char that indicates quote in a file
78-
(optional, by default it's '\"')
79-
is_quoting: bool, Whether or not to consider quotes
80-
(optional, by default it's `True`)
68+
f: file object
69+
offset_size: int
70+
Number of bytes to read and ignore.
71+
quotechar: bytes, default b'"'
72+
Indicate quote in a file.
73+
is_quoting: bool, default True
74+
Whether or not to consider quotes.
8175
8276
Returns
8377
-------
84-
bool: If file pointer reached the end of the file, but did not find
78+
bool
79+
If file pointer reached the end of the file, but did not find
8580
closing quote returns `False`. `True` in any other case.
8681
"""
87-
assert (
88-
nrows is not None or chunk_size_bytes is not None
89-
), "`nrows` and `chunk_size_bytes` can't be None at the same time"
90-
91-
if nrows is not None or skiprows is not None:
92-
return cls._read_rows(
93-
f,
94-
nrows=nrows,
95-
skiprows=skiprows,
96-
quotechar=quotechar,
97-
is_quoting=is_quoting,
98-
max_bytes=chunk_size_bytes,
99-
)[0]
100-
101-
outside_quotes = True
10282

10383
if is_quoting:
104-
chunk = f.read(chunk_size_bytes)
105-
line = f.readline() # Ensure we read up to a newline
106-
# We need to ensure that one row isn't split across different partitions
107-
outside_quotes = not ((chunk.count(quotechar) + line.count(quotechar)) % 2)
108-
while not outside_quotes:
109-
line = f.readline()
110-
outside_quotes = line.count(quotechar) % 2
111-
if not line:
112-
break
84+
chunk = f.read(offset_size)
85+
outside_quotes = not chunk.count(quotechar) % 2
11386
else:
114-
f.seek(chunk_size_bytes, os.SEEK_CUR)
115-
f.readline()
87+
f.seek(offset_size, os.SEEK_CUR)
88+
outside_quotes = True
89+
90+
# after we read `offset_size` bytes, we most likely break the line but
91+
# the modin implementation doesn't work correctly in the case, so we must
92+
# make sure that the line is read completely to the lineterminator,
93+
# which is what the `_read_rows` does
94+
outside_quotes, _ = cls._read_rows(
95+
f,
96+
nrows=1,
97+
quotechar=quotechar,
98+
is_quoting=is_quoting,
99+
outside_quotes=outside_quotes,
100+
)
101+
116102
return outside_quotes
117103

118104
@classmethod
119105
def partitioned_file(
120106
cls,
121107
f,
122-
nrows=None,
123-
skiprows=None,
124-
num_partitions=None,
125-
quotechar=b'"',
126-
is_quoting=True,
127-
from_begin=False,
108+
num_partitions: int = None,
109+
nrows: int = None,
110+
skiprows: int = None,
111+
quotechar: bytes = b'"',
112+
is_quoting: bool = True,
128113
):
129-
"""Computes chunk sizes in bytes for every partition.
114+
"""
115+
Compute chunk sizes in bytes for every partition.
130116
131117
Parameters
132118
----------
133-
f: file to be partitioned
134-
nrows: int (optional), number of rows of file to read
135-
skiprows: array or callable (optional), specifies rows to skip
136-
num_partitions: int, for what number of partitions split a file.
137-
Optional, if not specified grabs the value from `modin.pandas.DEFAULT_NPARTITIONS`
138-
quotechar: char that indicates quote in a file
139-
(optional, by default it's '\"')
140-
is_quoting: bool, Whether or not to consider quotes
141-
(optional, by default it's `True`)
142-
from_begin: bool, Whether or not to set the file pointer to the begining of the file
143-
(optional, by default it's `False`)
119+
f: file to be partitioned
120+
num_partitions: int, optional
121+
For what number of partitions split a file.
122+
If not specified grabs the value from `modin.pandas.DEFAULT_NPARTITIONS`
123+
nrows: int, optional
124+
Number of rows of file to read.
125+
skiprows: array or callable, optional
126+
Specifies rows to skip.
127+
quotechar: bytes, default b'"'
128+
Indicate quote in a file.
129+
is_quoting: bool, default True
130+
Whether or not to consider quotes.
144131
145132
Returns
146133
-------
147-
An array, where each element of array is a tuple of two ints:
148-
beginning and the end offsets of the current chunk.
134+
An array, where each element of array is a tuple of two ints:
135+
beginning and the end offsets of the current chunk.
149136
"""
150137
if num_partitions is None:
151138
from modin.pandas import DEFAULT_NPARTITIONS
152139

153140
num_partitions = DEFAULT_NPARTITIONS
154141

155142
result = []
143+
file_size = cls.file_size(f)
156144

157-
old_position = f.tell()
158-
if from_begin:
159-
f.seek(0, os.SEEK_SET)
160-
161-
current_start = f.tell()
162-
total_bytes = cls.file_size(f)
163-
164-
# if `nrows` are specified we want to use rows as a part measure
165-
if nrows is not None:
166-
chunk_size_bytes = None
167-
rows_per_part = max(1, num_partitions, nrows // num_partitions)
168-
else:
169-
chunk_size_bytes = max(1, num_partitions, total_bytes // num_partitions)
170-
rows_per_part = None
171-
nrows = float("inf")
172-
173-
rows_readed = 0
174-
while f.tell() < total_bytes and rows_readed < nrows:
175-
if rows_per_part is not None and rows_readed + rows_per_part > nrows:
176-
rows_per_part = nrows - rows_readed
177-
178-
outside_quotes = cls.offset(
145+
if skiprows:
146+
outside_quotes, read_rows = cls._read_rows(
179147
f,
180-
nrows=rows_per_part,
181-
skiprows=skiprows,
182-
chunk_size_bytes=chunk_size_bytes,
148+
nrows=skiprows,
183149
quotechar=quotechar,
184150
is_quoting=is_quoting,
185151
)
186152

187-
result.append((current_start, f.tell()))
188-
current_start = f.tell()
189-
if rows_per_part is not None:
190-
rows_readed += rows_per_part
191-
192-
if is_quoting and not outside_quotes:
193-
warnings.warn("File has mismatched quotes")
194-
195-
f.seek(old_position, os.SEEK_SET)
153+
start = f.tell()
154+
155+
if nrows:
156+
read_rows_counter = 0
157+
partition_size = max(1, num_partitions, nrows // num_partitions)
158+
while f.tell() < file_size and read_rows_counter < nrows:
159+
if read_rows_counter + partition_size > nrows:
160+
# it's possible only if is_quoting==True
161+
partition_size = nrows - read_rows_counter
162+
outside_quotes, read_rows = cls._read_rows(
163+
f,
164+
nrows=partition_size,
165+
quotechar=quotechar,
166+
is_quoting=is_quoting,
167+
)
168+
result.append((start, f.tell()))
169+
start = f.tell()
170+
read_rows_counter += read_rows
171+
172+
# add outside_quotes
173+
if is_quoting and not outside_quotes:
174+
warnings.warn("File has mismatched quotes")
175+
else:
176+
partition_size = max(1, num_partitions, file_size // num_partitions)
177+
while f.tell() < file_size:
178+
outside_quotes = cls.offset(
179+
f,
180+
offset_size=partition_size,
181+
quotechar=quotechar,
182+
is_quoting=is_quoting,
183+
)
184+
185+
result.append((start, f.tell()))
186+
start = f.tell()
187+
188+
# add outside_quotes
189+
if is_quoting and not outside_quotes:
190+
warnings.warn("File has mismatched quotes")
196191

197192
return result
198193

199194
@classmethod
200195
def _read_rows(
201196
cls,
202197
f,
203-
nrows=None,
204-
skiprows=None,
205-
quotechar=b'"',
206-
is_quoting=True,
207-
max_bytes=None,
198+
nrows: int,
199+
quotechar: bytes = b'"',
200+
is_quoting: bool = True,
201+
outside_quotes: bool = True,
208202
):
209203
"""
210-
Moves the file offset at the specified amount of rows
211-
Note: the difference between `offset` is that `_read_rows` is more
212-
specific version of `offset` which is focused of reading **rows**.
213-
In common case it's better to use `offset`.
204+
Move the file offset at the specified amount of rows.
214205
215206
Parameters
216207
----------
217-
f: file object
218-
nrows: int, number of rows to read. Optional, if not specified will only
219-
consider `max_bytes` parameter.
220-
skiprows: int, array or callable (optional), specifies rows to skip
221-
quotechar: char that indicates quote in a file
222-
(optional, by default it's '\"')
223-
is_quoting: bool, Whether or not to consider quotes
224-
(optional, by default it's `True`)
225-
max_bytes: int, Will read new rows while file pointer
226-
is less than `max_bytes`. Optional, if not specified will only
227-
consider `nrows` parameter, if both not specified will read till
228-
the end of the file.
208+
f: file object
209+
nrows: int
210+
Number of rows to read.
211+
quotechar: bytes, default b'"'
212+
Indicate quote in a file.
213+
is_quoting: bool, default True
214+
Whether or not to consider quotes.
215+
outside_quotes: bool, default True
216+
Whether the file pointer is within quotes or not at the time this function is called.
229217
230218
Returns
231219
-------
232-
tuple of bool and int,
233-
bool: If file pointer reached the end of the file, but did not find
220+
tuple of bool and int,
221+
bool: If file pointer reached the end of the file, but did not find
234222
closing quote returns `False`. `True` in any other case.
235-
int: Number of rows that was readed.
223+
int: Number of rows that was read.
236224
"""
237-
assert skiprows is None or isinstance(
238-
skiprows, int
239-
), f"Skiprows as a {type(skiprows)} is not supported yet."
240-
241-
if nrows is None and max_bytes is None:
242-
max_bytes = float("inf")
243-
244225
if nrows is not None and nrows <= 0:
245226
return True, 0
246227

247-
# we need this condition to avoid unnecessary checks in `stop_condition`
248-
# which executes in a huge for loop
249-
if nrows is not None and max_bytes is None:
250-
stop_condition = lambda rows_readed: rows_readed >= nrows # noqa (E731)
251-
elif nrows is not None and max_bytes is not None:
252-
stop_condition = (
253-
lambda rows_readed: f.tell() >= max_bytes or rows_readed >= nrows
254-
) # noqa (E731)
255-
else:
256-
stop_condition = lambda rows_readed: f.tell() >= max_bytes # noqa (E731)
257-
258-
if max_bytes is not None:
259-
max_bytes = max_bytes + f.tell()
228+
rows_read = 0
260229

261-
rows_readed = 0
262-
outside_quotes = True
263230
for line in f:
264231
if is_quoting and line.count(quotechar) % 2:
265232
outside_quotes = not outside_quotes
266233
if outside_quotes:
267-
rows_readed += 1
268-
if stop_condition(rows_readed):
234+
rows_read += 1
235+
if rows_read >= nrows:
269236
break
270237

238+
# case when EOF
271239
if not outside_quotes:
272-
rows_readed += 1
240+
rows_read += 1
273241

274-
return outside_quotes, rows_readed
242+
return outside_quotes, rows_read

0 commit comments

Comments
 (0)