Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(
file_max_bytes: int = None,
disable_compression: bool = False,
_caps: DestinationCapabilitiesContext = None,
**writer_kwargs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see this used. do you need it for testing? We pass csv options via config injection (you do it correctly below)

):
self.writer_spec = writer_spec
if self.writer_spec.requires_destination_capabilities and not _caps:
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
self._created: float = None
self._last_modified: float = None
self._closed = False
self._writer_kwargs = writer_kwargs
try:
self._rotate_file()
except TypeError:
Expand Down Expand Up @@ -243,7 +245,7 @@ def _flush_items(self, allow_empty_file: bool = False) -> None:
self._file = self.open(self._file_name, "wb") # type: ignore
else:
self._file = self.open(self._file_name, "wt", encoding="utf-8", newline="")
self._writer = self.writer_cls(self._file, caps=self._caps) # type: ignore[assignment]
self._writer = self.writer_cls(self._file, caps=self._caps, **self._writer_kwargs) # type: ignore[assignment]
self._writer.write_header(self._current_columns)
# write buffer
if self._buffered_items:
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/data_writers/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
from dlt.common.configuration.specs import BaseConfiguration

CsvQuoting = Literal["quote_all", "quote_needed"]
CsvLineEnding = Literal["lf", "crlf"]


@configspec
class CsvFormatConfiguration(BaseConfiguration):
delimiter: str = ","
include_header: bool = True
quoting: CsvQuoting = "quote_needed"
line_ending: CsvLineEnding = "lf"

# read options
on_error_continue: bool = False
Expand Down
26 changes: 22 additions & 4 deletions dlt/common/data_writers/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CsvFormatConfiguration,
CsvQuoting,
ParquetFormatConfiguration,
CsvLineEnding,
)
from dlt.common.destination import (
DestinationCapabilitiesContext,
Expand Down Expand Up @@ -418,12 +419,14 @@ def __init__(
delimiter: str = ",",
include_header: bool = True,
quoting: CsvQuoting = "quote_needed",
line_ending: CsvLineEnding = "lf",
bytes_encoding: str = "utf-8",
) -> None:
super().__init__(f, caps)
self.include_header = include_header
self.delimiter = delimiter
self.quoting: CsvQuoting = quoting
self.line_ending = line_ending
self.writer: csv.DictWriter[str] = None
self.bytes_encoding = bytes_encoding

Expand All @@ -436,11 +439,15 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
else:
raise ValueError(self.quoting)

# Create a custom dialect based on unix_dialect
class CustomDialect(csv.unix_dialect):
lineterminator = "\r\n" if self.line_ending == "crlf" else "\n"

self.writer = csv.DictWriter(
self._f,
fieldnames=list(columns_schema.keys()),
extrasaction="ignore",
dialect=csv.unix_dialect,
dialect=CustomDialect,
delimiter=self.delimiter,
quoting=quoting,
)
Expand Down Expand Up @@ -550,12 +557,14 @@ def __init__(
delimiter: str = ",",
include_header: bool = True,
quoting: CsvQuoting = "quote_needed",
line_ending: CsvLineEnding = "lf",
) -> None:
super().__init__(f, caps)
self.delimiter = delimiter
self._delimiter_b = delimiter.encode("ascii")
self.include_header = include_header
self.quoting: CsvQuoting = quoting
self.line_ending = line_ending
self.writer: Any = None

def write_header(self, columns_schema: TTableSchemaColumns) -> None:
Expand All @@ -564,6 +573,10 @@ def write_header(self, columns_schema: TTableSchemaColumns) -> None:
def write_data(self, items: Sequence[TDataItem]) -> None:
from dlt.common.libs.pyarrow import pyarrow
import pyarrow.csv
import pkg_resources

# Check pyarrow version
pyarrow_version = tuple(map(int, pyarrow.__version__.split(".")[:2]))

for item in items:
if isinstance(item, (pyarrow.Table, pyarrow.RecordBatch)):
Expand All @@ -574,14 +587,19 @@ def write_data(self, items: Sequence[TDataItem]) -> None:
quoting = "all_valid"
else:
raise ValueError(self.quoting)
write_options_kwargs = dict(
include_header=self.include_header,
delimiter=self._delimiter_b,
quoting_style=quoting,
)
if pyarrow_version >= (14, 0):
write_options_kwargs["eol"] = self.line_ending.encode("ascii")
try:
self.writer = pyarrow.csv.CSVWriter(
self._f,
item.schema,
write_options=pyarrow.csv.WriteOptions(
include_header=self.include_header,
delimiter=self._delimiter_b,
quoting_style=quoting,
**write_options_kwargs
),
)
self._first_schema = item.schema
Expand Down
5 changes: 4 additions & 1 deletion docs/website/docs/dlt-ecosystem/file-formats/csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The CSV format is supported by the following destinations: **Postgres**, **Files
* separators are commas
* quotes are **"** and are escaped as **""**
* `NULL` values are both empty strings and empty tokens as in the example below
* UNIX new lines are used
* UNIX new lines (LF) are used by default
* dates are represented as ISO 8601
* quoting style is "when needed"

Expand All @@ -50,6 +50,7 @@ with standard settings:
* delimiter: change the delimiting character (default: ',')
* include_header: include the header row (default: True)
* quoting: **quote_all** - all values are quoted, **quote_needed** - quote only values that need quoting (default: `quote_needed`)
* line_ending: **lf** - use UNIX line endings (default), **crlf** - use Windows line endings

When **quote_needed** is selected: in the case of the Python csv writer, all non-numeric values are quoted. In the case of the pyarrow csv writer, the exact behavior is not described in the documentation. We observed that in some cases, strings are not quoted as well.

Expand All @@ -58,6 +59,7 @@ When **quote_needed** is selected: in the case of the Python csv writer, all non
delimiter="|"
include_header=false
quoting="quote_all"
line_ending="crlf"
```

Or using environment variables:
Expand All @@ -66,6 +68,7 @@ Or using environment variables:
NORMALIZE__DATA_WRITER__DELIMITER=|
NORMALIZE__DATA_WRITER__INCLUDE_HEADER=False
NORMALIZE__DATA_WRITER__QUOTING=quote_all
NORMALIZE__DATA_WRITER__LINE_ENDING=crlf
```

### Destination settings
Expand Down
9 changes: 6 additions & 3 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
[pytest]
pythonpath= dlt docs/website/docs
norecursedirs= .direnv .eggs build dist
addopts= -v --showlocals --durations 10
norecursedirs= .direnv .eggs build dist .git .tox .env .venv venv env
addopts= -v --showlocals --durations 10 --log-cli-level=INFO
xfail_strict= true
log_cli= 1
log_cli_level= INFO
python_files = test_*.py *_test.py *snippets.py *snippet.pytest
python_functions = *_test test_* *_snippet
filterwarnings= ignore::DeprecationWarning
markers =
essential: marks all essential tests
essential: marks tests as essential
forked: marks tests that need to run in a forked process
asyncio: marks tests that use asyncio
mssql: marks tests that require MSSQL
no_load: marks tests that do not load anything

30 changes: 30 additions & 0 deletions tests/common/data_writers/test_data_writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,33 @@ def test_import_file_writer() -> None:
w_ = writer(None)
with pytest.raises(NotImplementedError):
w_.write_header(None)


def test_csv_writer_line_endings() -> None:
"""Test that CSV writer correctly handles different line endings"""
from dlt.common.data_writers.writers import CsvWriter
from dlt.common.data_writers.configuration import CsvFormatConfiguration
from io import StringIO

# Test LF line endings (default)
f_lf = StringIO()
writer_lf = CsvWriter(f_lf, line_ending="lf")
writer_lf.write_header({"col1": {"name": "col1", "data_type": "text"}})
writer_lf.write_data([{"col1": "value1"}, {"col1": "value2"}])
writer_lf.close()
content_lf = f_lf.getvalue()
assert content_lf.count("\n") == 3 # header + 2 rows
assert content_lf.count("\r") == 0 # no CR characters

# Test CRLF line endings
f_crlf = StringIO()
writer_crlf = CsvWriter(f_crlf, line_ending="crlf")
writer_crlf.write_header({"col1": {"name": "col1", "data_type": "text"}})
writer_crlf.write_data([{"col1": "value1"}, {"col1": "value2"}])
writer_crlf.close()
content_crlf = f_crlf.getvalue()
assert content_crlf.count("\r\n") == 3 # header + 2 rows
# Ensure all line endings are CRLF and there are no isolated LF or CR
assert "\r\n" in content_crlf
assert "\r" not in content_crlf.replace("\r\n", "")
assert "\n" not in content_crlf.replace("\r\n", "")
2 changes: 2 additions & 0 deletions tests/common/data_writers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def get_writer(
file_max_bytes: Optional[int] = None,
disable_compression: bool = False,
caps: DestinationCapabilitiesContext = None,
**writer_kwargs,
) -> BufferedDataWriter[TWriter]:
caps = caps or DestinationCapabilitiesContext.generic_capabilities()
writer_spec = writer.writer_spec()
Expand All @@ -35,4 +36,5 @@ def get_writer(
file_max_bytes=file_max_bytes,
disable_compression=disable_compression,
_caps=caps,
**writer_kwargs,
)
26 changes: 26 additions & 0 deletions tests/libs/test_csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,29 @@ def assert_csv_rows(csv_row: Dict[str, Any], expected_row: Dict[str, Any]) -> No
assert actual[1] == expected, print(
f"Failed on {actual[0]}: actual: {actual[1]} vs expected: {expected}"
)


@pytest.mark.parametrize("line_ending", ["lf", "crlf"])
@pytest.mark.parametrize("writer_type", [CsvWriter, ArrowToCsvWriter])
def test_csv_line_endings(writer_type: Type[DataWriter], line_ending: str) -> None:
data = copy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES)

if writer_type == ArrowToCsvWriter:
# write parquet and read it
with get_writer(ParquetDataWriter) as pq_writer:
pq_writer.write_data_item([data], TABLE_UPDATE_COLUMNS_SCHEMA)

with open(pq_writer.closed_files[0].file_path, "rb") as f:
table = pq.read_table(f)
data = table

with get_writer(writer_type, disable_compression=True, line_ending=line_ending) as writer:
writer.write_data_item(data, TABLE_UPDATE_COLUMNS_SCHEMA)

with open(writer.closed_files[0].file_path, "rb") as f:
content = f.read()
expected_eol = b"\r\n" if line_ending == "crlf" else b"\n"
assert expected_eol in content
# Verify that all line endings are consistent
assert content.count(expected_eol) > 0
assert content.count(b"\r\n" if line_ending == "lf" else b"\n") == 0
Loading