Skip to content

Commit 5e52122

Browse files
authored
[c++/python] Revert 3764 for global-order writes (#3886)
1 parent ef96798 commit 5e52122

File tree

12 files changed

+56
-226
lines changed

12 files changed

+56
-226
lines changed

apis/python/src/tiledbsoma/_dataframe.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from . import pytiledbsoma as clib
3030
from ._constants import SOMA_GEOMETRY, SOMA_JOINID
3131
from ._exception import SOMAError, map_exception_for_create
32-
from ._read_iters import TableReadIter
32+
from ._read_iters import ManagedQuery, TableReadIter
3333
from ._soma_array import SOMAArray
3434
from ._tdb_handles import DataFrameWrapper
3535
from ._types import (
@@ -801,7 +801,12 @@ def write(
801801
"TileDBWriteOptions instead of TileDBCreateOptions"
802802
)
803803
write_options = TileDBWriteOptions.from_platform_config(platform_config)
804-
self._write_table(values, write_options.sort_coords)
804+
sort_coords = write_options.sort_coords
805+
806+
for batch in values.to_batches():
807+
mq = ManagedQuery(self)
808+
mq._handle.set_array_data(batch)
809+
mq._handle.submit_write(sort_coords or False)
805810

806811
if write_options.consolidate_and_vacuum:
807812
self._handle._handle.consolidate_and_vacuum()

apis/python/src/tiledbsoma/_dense_nd_array.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ def write(
315315
_util._set_coords(mq, new_coords)
316316
mq._handle.set_column_data("soma_data", input)
317317
mq._handle.submit_write()
318-
mq._handle.finalize()
319318

320319
tiledb_write_options = TileDBWriteOptions.from_platform_config(platform_config)
321320
if tiledb_write_options.consolidate_and_vacuum:

apis/python/src/tiledbsoma/_geometry_dataframe.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
_revise_domain_for_extent,
3939
)
4040
from ._exception import SOMAError, map_exception_for_create
41-
from ._read_iters import TableReadIter
41+
from ._read_iters import ManagedQuery, TableReadIter
4242
from ._spatial_dataframe import SpatialDataFrame
4343
from ._spatial_util import (
4444
coordinate_space_from_json,
@@ -498,16 +498,24 @@ def write(
498498
_util.check_type("values", values, (pa.Table,))
499499

500500
write_options: Union[TileDBCreateOptions, TileDBWriteOptions]
501+
sort_coords = None
501502
if isinstance(platform_config, TileDBCreateOptions):
502503
raise ValueError(
503504
"As of TileDB-SOMA 1.13, the write method takes "
504505
"TileDBWriteOptions instead of TileDBCreateOptions"
505506
)
506507
write_options = TileDBWriteOptions.from_platform_config(platform_config)
507-
self._write_table(values, write_options.sort_coords)
508+
sort_coords = write_options.sort_coords
509+
510+
clib_dataframe = self._handle._handle
511+
512+
for batch in values.to_batches():
513+
mq = ManagedQuery(self, None)
514+
mq._handle.set_array_data(batch)
515+
mq._handle.submit_write(sort_coords or False)
508516

509517
if write_options.consolidate_and_vacuum:
510-
self._handle._handle.consolidate_and_vacuum()
518+
clib_dataframe.consolidate_and_vacuum()
511519

512520
return self
513521

apis/python/src/tiledbsoma/_point_cloud_dataframe.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
_revise_domain_for_extent,
3131
)
3232
from ._exception import SOMAError, map_exception_for_create
33-
from ._read_iters import TableReadIter
33+
from ._read_iters import ManagedQuery, TableReadIter
3434
from ._spatial_dataframe import SpatialDataFrame
3535
from ._spatial_util import (
3636
coordinate_space_from_json,
@@ -480,10 +480,16 @@ def write(
480480
)
481481
write_options = TileDBWriteOptions.from_platform_config(platform_config)
482482
sort_coords = write_options.sort_coords
483-
self._write_table(values, sort_coords)
483+
484+
clib_dataframe = self._handle._handle
485+
486+
for batch in values.to_batches():
487+
mq = ManagedQuery(self, None)
488+
mq._handle.set_array_data(batch)
489+
mq._handle.submit_write(sort_coords or False)
484490

485491
if write_options.consolidate_and_vacuum:
486-
self._handle._handle.consolidate_and_vacuum()
492+
clib_dataframe.consolidate_and_vacuum()
487493

488494
return self
489495

apis/python/src/tiledbsoma/_soma_array.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
# This package's pybind11 code
1313
from . import pytiledbsoma as clib # noqa: E402
14-
from ._read_iters import ManagedQuery
1514
from ._soma_object import SOMAObject
1615

1716

@@ -160,58 +159,3 @@ def _maxdomain(self) -> Tuple[Tuple[Any, Any], ...]:
160159
resized up to core (max) domain.
161160
"""
162161
return self._handle.maxdomain
163-
164-
def _write_table(self, values: pa.Table, sort_coords: bool) -> None:
165-
"""Helper function that sets the correct result order for the layout
166-
and allows for multiple submissions before calling `submit` and `finalize`
167-
for unordered write or `submit_and_finalize` for global order writes.
168-
169-
Args:
170-
values:
171-
An `Arrow table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`_
172-
containing all columns, including the index columns. The schema
173-
for the values must match the schema for the :class:`DataFrame`.
174-
175-
If a column is of categorical type in the schema and a
176-
flattened/non-categorical column is presented for data on write,
177-
a ``ValueError`` is raised. If a column is of non-categorical
178-
type in the schema and a categorical column is presented for data
179-
on write, the data are written as an array of category values,
180-
and the category-type information is not saved.
181-
sort_coords:
182-
Whether the coordinates need to be sorted (True) or are already
183-
sorted in global order (False). In the PlatformConfig, this is
184-
is to True by default.
185-
"""
186-
batches = values.to_batches()
187-
if not batches:
188-
return
189-
190-
layout = (
191-
clib.ResultOrder.unordered if sort_coords else clib.ResultOrder.globalorder
192-
)
193-
194-
if layout == clib.ResultOrder.unordered:
195-
for batch in batches:
196-
# Create new ManagedQuery per each batch
197-
mq = ManagedQuery(self)._handle
198-
mq.set_layout(layout)
199-
mq.set_array_data(batch)
200-
201-
# Submit and flush every batch
202-
mq.submit_write()
203-
mq.finalize()
204-
205-
else: # global order
206-
# Create a single ManagedQuery at the beginning
207-
mq = ManagedQuery(self)._handle
208-
mq.set_layout(layout)
209-
210-
# Submit for each batch but don't finalize
211-
for batch in batches[:-1]:
212-
mq.set_array_data(batch)
213-
mq.submit_write()
214-
215-
# Only finalize at the last batch
216-
mq.set_array_data(batches[-1])
217-
mq.submit_and_finalize()

apis/python/src/tiledbsoma/_sparse_nd_array.py

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,14 @@ def write(
308308
"""
309309

310310
write_options: Union[TileDBCreateOptions, TileDBWriteOptions]
311+
sort_coords = None
311312
if isinstance(platform_config, TileDBCreateOptions):
312313
raise ValueError(
313314
"As of TileDB-SOMA 1.13, the write method takes "
314315
"TileDBWriteOptions instead of TileDBCreateOptions"
315316
)
316317
write_options = TileDBWriteOptions.from_platform_config(platform_config)
318+
sort_coords = write_options.sort_coords
317319

318320
clib_sparse_array = self._handle._handle
319321

@@ -322,14 +324,6 @@ def write(
322324
data, coords = values.to_numpy()
323325

324326
mq = ManagedQuery(self, platform_config)
325-
326-
layout = (
327-
clib.ResultOrder.unordered
328-
if write_options.sort_coords
329-
else clib.ResultOrder.globalorder
330-
)
331-
mq._handle.set_layout(layout)
332-
333327
for i, c in enumerate(coords.T):
334328
mq._handle.set_column_data(
335329
f"soma_dim_{i}",
@@ -344,12 +338,7 @@ def write(
344338
data, dtype=self.schema.field("soma_data").type.to_pandas_dtype()
345339
),
346340
)
347-
348-
if layout == clib.ResultOrder.unordered:
349-
mq._handle.submit_write()
350-
mq._handle.finalize()
351-
else:
352-
mq._handle.submit_and_finalize()
341+
mq._handle.submit_write(sort_coords or True)
353342

354343
if write_options.consolidate_and_vacuum:
355344
# Consolidate non-bulk data
@@ -366,14 +355,6 @@ def write(
366355
sp = values.to_scipy().tocoo()
367356

368357
mq = ManagedQuery(self, platform_config)
369-
370-
layout = (
371-
clib.ResultOrder.unordered
372-
if write_options.sort_coords
373-
else clib.ResultOrder.globalorder
374-
)
375-
mq._handle.set_layout(layout)
376-
377358
for i, c in enumerate([sp.row, sp.col]):
378359
mq._handle.set_column_data(
379360
f"soma_dim_{i}",
@@ -388,20 +369,20 @@ def write(
388369
sp.data, dtype=self.schema.field("soma_data").type.to_pandas_dtype()
389370
),
390371
)
391-
392-
if layout == clib.ResultOrder.unordered:
393-
mq._handle.submit_write()
394-
mq._handle.finalize()
395-
else:
396-
mq._handle.submit_and_finalize()
372+
mq._handle.submit_write(sort_coords or True)
397373

398374
if write_options.consolidate_and_vacuum:
399375
# Consolidate non-bulk data
400376
clib_sparse_array.consolidate_and_vacuum()
401377
return self
402378

403379
if isinstance(values, pa.Table):
404-
self._write_table(values, write_options.sort_coords)
380+
# Write bulk data
381+
for batch in values.to_batches():
382+
# clib_sparse_array.write(batch, sort_coords or False)
383+
mq = ManagedQuery(self, None)
384+
mq._handle.set_array_data(batch)
385+
mq._handle.submit_write(sort_coords or False)
405386

406387
if write_options.consolidate_and_vacuum:
407388
# Consolidate non-bulk data

apis/python/src/tiledbsoma/managed_query.cc

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -156,39 +156,16 @@ void load_managed_query(py::module& m) {
156156
}
157157
py::gil_scoped_acquire acquire;
158158
})
159-
.def("reset_columns", &ManagedQuery::reset_columns)
160-
161159
.def(
162160
"submit_write",
163-
[](ManagedQuery& mq) {
164-
try {
165-
mq.submit_write();
166-
} catch (const std::exception& e) {
167-
TPY_ERROR_LOC(e.what());
168-
}
169-
},
170-
py::call_guard<py::gil_scoped_release>())
171-
172-
.def(
173-
"submit_and_finalize",
174-
[](ManagedQuery& mq) {
175-
try {
176-
mq.submit_and_finalize();
177-
} catch (const std::exception& e) {
178-
TPY_ERROR_LOC(e.what());
179-
}
180-
},
181-
py::call_guard<py::gil_scoped_release>())
182-
183-
.def(
184-
"finalize",
185-
[](ManagedQuery& mq) {
161+
[](ManagedQuery& mq, bool sort_coords) {
186162
try {
187-
mq.finalize();
163+
mq.submit_write(sort_coords);
188164
} catch (const std::exception& e) {
189165
TPY_ERROR_LOC(e.what());
190166
}
191167
},
168+
"sort_coords"_a = false,
192169
py::call_guard<py::gil_scoped_release>())
193170

194171
.def("reset", &ManagedQuery::reset)

apis/python/src/tiledbsoma/pytiledbsoma.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,7 @@ PYBIND11_MODULE(pytiledbsoma, m) {
6868
py::enum_<ResultOrder>(m, "ResultOrder")
6969
.value("automatic", ResultOrder::automatic)
7070
.value("rowmajor", ResultOrder::rowmajor)
71-
.value("colmajor", ResultOrder::colmajor)
72-
.value("unordered", ResultOrder::unordered)
73-
.value("globalorder", ResultOrder::global);
71+
.value("colmajor", ResultOrder::colmajor);
7472

7573
py::enum_<URIType>(m, "URIType")
7674
.value("automatic", URIType::automatic)

apis/python/tests/test_dataframe.py

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,58 +2442,3 @@ def nan_check(expected_nan, dict_vals):
24422442
assert nan_check(signaling_nan, actual_dict_vals)
24432443
assert_array_equal(expected_data1["A"], actual_data[:4])
24442444
assert_array_equal(expected_data2["A"], actual_data[4:])
2445-
2446-
2447-
def test_fragments_in_writes(tmp_path):
2448-
uri = tmp_path.as_posix()
2449-
2450-
# --- three dataframes, all with identical schema
2451-
df_0 = pd.DataFrame(
2452-
{
2453-
"soma_joinid": pd.Series([0, 1, 2, 3], dtype=np.int64),
2454-
"obs": pd.Series(["A", "B", "A", "B"], dtype="str"),
2455-
}
2456-
)
2457-
df_1 = pd.DataFrame(
2458-
{
2459-
"soma_joinid": pd.Series([4, 5, 6, 7], dtype=np.int64),
2460-
"obs": pd.Series(["A", "A", "B", "B"], dtype="str"),
2461-
}
2462-
)
2463-
df_2 = pd.DataFrame(
2464-
{
2465-
"soma_joinid": pd.Series([8, 9, 10, 11], dtype=np.int64),
2466-
"obs": pd.Series(["B", "C", "B", "C"], dtype="str"),
2467-
}
2468-
)
2469-
expected_df = pd.concat([df_0, df_1, df_2], ignore_index=True)
2470-
2471-
soma.DataFrame.create(
2472-
uri,
2473-
schema=pa.Schema.from_pandas(df_0, preserve_index=False),
2474-
domain=[[0, 11]],
2475-
).close()
2476-
2477-
with soma.open(uri, mode="w") as A:
2478-
# Three-chunk table
2479-
A.write(
2480-
pa.concat_tables(
2481-
[
2482-
pa.Table.from_pandas(df_0, preserve_index=False),
2483-
pa.Table.from_pandas(df_1, preserve_index=False),
2484-
pa.Table.from_pandas(df_2, preserve_index=False),
2485-
]
2486-
),
2487-
platform_config=soma.TileDBWriteOptions(**{"sort_coords": False}),
2488-
)
2489-
2490-
# There should be a single fragment even though there are three chunks (and
2491-
# therefore three submits) in the array because we only finalize once at
2492-
# the end
2493-
assert len(list((Path(uri) / "__commits").iterdir())) == 1
2494-
assert len(list((Path(uri) / "__fragments").iterdir())) == 1
2495-
2496-
with soma.open(uri) as A:
2497-
df = A.read().concat().to_pandas()
2498-
2499-
assert df.equals(expected_df)

0 commit comments

Comments
 (0)