Skip to content

Commit c758a56

Browse files
committed
Fix issues with AsyncBufferSequence.LineSequence
The main problem is that the internal buffer used by AsyncBufferSequence.LineSequence may end on the boundary of a line ending sequence -- this impacts primarily the UTF-8 encoding, but also impacts UTF-16 and UTF-32 with the \r\n sequence specifically. When this condition occurs, the range check prevents the peek-ahead to the next 1 or 2 bytes and prevents a complete line from being returned to the client. The buffer size is based on the page size by default, which on my macOS and Linux systems respectively are 16384 and 4096. This led to testLineSequence failing frequently on Linux due to the greater likelihood of a line ending being split across a multiple of the buffer size. To fix this, we always load more bytes into the buffer until the buffer no longer ends with a potential partial line ending sequence (unless we hit EOF which correctly causes an early return). Additionally, the testLineSequence test could generate empty lines, which meant that it was possible to have a line ending with \r followed by an (empty) line ending with \n, indistinguishable from a single line ending with \r\n. This is a problem for any line ending sequences where one is a prefix of another -- \r and \r\n are the only ones which meet this criteria. To fix that, prevent the test from ever generating an empty buffer, and since it does so by restricting the character set to Latin, will never again produce the problematic sequence. Also switch testTeardownSequence to use AsyncBufferSequence.LineSequence instead of its custom line splitting logic. This ensures the test works correctly regardless of buffer size, even with a contrived buffer size of 1. Closes #78
1 parent 8881dcf commit c758a56

File tree

2 files changed

+94
-102
lines changed

2 files changed

+94
-102
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 83 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,9 @@ extension AsyncBufferSequence {
104104

105105
private var source: AsyncBufferSequence.AsyncIterator
106106
private var buffer: [Encoding.CodeUnit]
107+
private var underlyingBuffer: [Encoding.CodeUnit]
108+
private var leftover: Encoding.CodeUnit?
107109
private var eofReached: Bool
108-
private var startIndex: Int
109110
private let bufferingPolicy: BufferingPolicy
110111

111112
internal init(
@@ -114,8 +115,9 @@ extension AsyncBufferSequence {
114115
) {
115116
self.source = underlyingIterator
116117
self.buffer = []
118+
self.underlyingBuffer = []
119+
self.leftover = nil
117120
self.eofReached = false
118-
self.startIndex = 0
119121
self.bufferingPolicy = bufferingPolicy
120122
}
121123

@@ -160,15 +162,32 @@ extension AsyncBufferSequence {
160162
return result.isEmpty ? nil : result
161163
}
162164

163-
func yield(at endIndex: Int) -> String? {
165+
func yield() -> String? {
164166
defer {
165-
self.buffer.removeFirst(endIndex)
166-
self.startIndex = 0
167+
self.buffer.removeAll(keepingCapacity: true)
167168
}
168169
if self.buffer.isEmpty {
169170
return nil
170171
}
171-
return String(decoding: self.buffer[0 ..< endIndex], as: Encoding.self)
172+
return String(decoding: self.buffer, as: Encoding.self)
173+
}
174+
175+
func nextFromSource() async throws -> Encoding.CodeUnit? {
176+
if underlyingBuffer.isEmpty {
177+
guard let buf = try await loadBuffer() else {
178+
return nil
179+
}
180+
underlyingBuffer = buf
181+
}
182+
return underlyingBuffer.removeFirst()
183+
}
184+
185+
func nextCodeUnit() async throws -> Encoding.CodeUnit? {
186+
defer { leftover = nil }
187+
if let leftover = leftover {
188+
return leftover
189+
}
190+
return try await nextFromSource()
172191
}
173192

174193
// https://en.wikipedia.org/wiki/Newline#Unicode
@@ -213,95 +232,70 @@ extension AsyncBufferSequence {
213232
fatalError("Unknown encoding type \(Encoding.self)")
214233
}
215234

216-
while true {
217-
// Step 1: Load more buffer if needed
218-
if self.startIndex >= self.buffer.count {
219-
guard let nextBuffer = try await loadBuffer() else {
220-
// We have no more data
221-
// Return the remaining data
222-
return yield(at: self.buffer.count)
223-
}
224-
self.buffer += nextBuffer
235+
while let first = try await nextCodeUnit() {
236+
// Throw if we exceed max line length
237+
if case .maxLineLength(let maxLength) = self.bufferingPolicy, buffer.count >= maxLength {
238+
throw SubprocessError(
239+
code: .init(.streamOutputExceedsLimit(maxLength)),
240+
underlyingError: nil
241+
)
225242
}
226-
// Step 2: Iterate through buffer to find next line
227-
var currentIndex: Int = self.startIndex
228-
for index in self.startIndex ..< self.buffer.count {
229-
currentIndex = index
230-
// Throw if we exceed max line length
231-
if case .maxLineLength(let maxLength) = self.bufferingPolicy,
232-
currentIndex >= maxLength {
233-
throw SubprocessError(
234-
code: .init(.streamOutputExceedsLimit(maxLength)),
235-
underlyingError: nil
236-
)
243+
244+
buffer.append(first)
245+
switch first {
246+
case carriageReturn:
247+
// Swallow up any subsequent LF
248+
guard let next = try await nextFromSource() else {
249+
return yield() // if we ran out of bytes, the last byte was a CR
237250
}
238-
let byte = self.buffer[currentIndex]
239-
switch byte {
240-
case carriageReturn:
241-
// Swallow any subsequent lineFeed if there is one
242-
var targetIndex = currentIndex
243-
if (currentIndex + 1) < self.buffer.count, self.buffer[currentIndex + 1] == lineFeed {
244-
targetIndex = currentIndex + 1
245-
}
246-
guard let result = yield(at: targetIndex + 1) else {
247-
continue
248-
}
249-
return result
250-
case lineFeed ..< carriageReturn:
251-
guard let result = yield(at: currentIndex + 1) else {
252-
continue
253-
}
254-
return result
255-
case newLine1:
256-
var targetIndex = currentIndex
257-
if Encoding.CodeUnit.self is UInt8.Type {
258-
// For UTF8, look for the next 0x85 byte
259-
guard (targetIndex + 1) < self.buffer.count,
260-
self.buffer[targetIndex + 1] == newLine2 else {
261-
// Not a valid new line. Keep looking
262-
continue
263-
}
264-
// Swallow 0x85 byte
265-
targetIndex += 1
266-
}
267-
guard let result = yield(at: targetIndex + 1) else {
268-
continue
269-
}
270-
return result
271-
case lineSeparator1, paragraphSeparator1:
272-
var targetIndex = currentIndex
273-
if Encoding.CodeUnit.self is UInt8.Type {
274-
// For UTF8, look for the next byte
275-
guard (targetIndex + 1) < self.buffer.count,
276-
self.buffer[targetIndex + 1] == lineSeparator2 ||
277-
self.buffer[targetIndex + 1] == paragraphSeparator2 else {
278-
// Not a valid new line. Keep looking
279-
continue
280-
}
281-
// Swallow next byte
282-
targetIndex += 1
283-
// Look for the final byte
284-
guard (targetIndex + 1) < self.buffer.count,
285-
(self.buffer[targetIndex + 1] == lineSeparator3 ||
286-
self.buffer[targetIndex + 1] == paragraphSeparator3) else {
287-
// Not a valid new line. Keep looking
288-
continue
289-
}
290-
// Swallow 0xA8 (or 0xA9) byte
291-
targetIndex += 1
292-
}
293-
guard let result = yield(at: targetIndex + 1) else {
294-
continue
295-
}
296-
return result
297-
default:
298-
// Keep searching
251+
buffer.append(next)
252+
guard next == lineFeed else {
253+
// if the next character was not an LF, save it for the next iteration and still return a line
254+
leftover = buffer.removeLast()
255+
return yield()
256+
}
257+
return yield()
258+
case newLine1 where Encoding.CodeUnit.self is UInt8.Type: // this may be used to compose other UTF8 characters
259+
guard let next = try await nextFromSource() else {
260+
// technically invalid UTF8 but it should be repaired to "\u{FFFD}"
261+
return yield()
262+
}
263+
buffer.append(next)
264+
guard next == newLine2 else {
265+
continue
266+
}
267+
return yield()
268+
case lineSeparator1 where Encoding.CodeUnit.self is UInt8.Type,
269+
paragraphSeparator1 where Encoding.CodeUnit.self is UInt8.Type:
270+
// Try to read: 80 [A8 | A9].
271+
// If we can't, then we put the byte in the buffer for error correction
272+
guard let next = try await nextFromSource() else {
273+
return yield()
274+
}
275+
buffer.append(next)
276+
guard next == lineSeparator2 || next == paragraphSeparator2 else {
299277
continue
300278
}
279+
guard let fin = try await nextFromSource() else {
280+
return yield()
281+
}
282+
buffer.append(fin)
283+
guard fin == lineSeparator3 || fin == paragraphSeparator3 else {
284+
continue
285+
}
286+
return yield()
287+
case lineFeed ..< carriageReturn, newLine1, lineSeparator1, paragraphSeparator1:
288+
return yield()
289+
default:
290+
continue
301291
}
302-
// There is no new line in the buffer. Load more buffer and try again
303-
self.startIndex = currentIndex + 1
304292
}
293+
294+
// Don't emit an empty newline when there is no more content (e.g. end of file)
295+
if !buffer.isEmpty {
296+
return yield()
297+
}
298+
return nil
305299
}
306300
}
307301

Tests/SubprocessTests/SubprocessTests+Unix.swift

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -760,15 +760,8 @@ extension SubprocessUnixTests {
760760
}
761761
group.addTask {
762762
var outputs: [String] = []
763-
for try await bit in standardOutput {
764-
let bitString = bit.withUnsafeBytes { ptr in
765-
return String(decoding: ptr, as: UTF8.self)
766-
}.trimmingCharacters(in: .whitespacesAndNewlines)
767-
if bitString.contains("\n") {
768-
outputs.append(contentsOf: bitString.split(separator: "\n").map { String($0) })
769-
} else {
770-
outputs.append(bitString)
771-
}
763+
for try await line in standardOutput.lines() {
764+
outputs.append(line.trimmingCharacters(in: .newlines))
772765
}
773766
#expect(outputs == ["saw SIGQUIT", "saw SIGTERM", "saw SIGINT"])
774767
}
@@ -881,17 +874,21 @@ extension SubprocessUnixTests {
881874
let length: Int
882875
switch size {
883876
case .large:
884-
length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize))
877+
length = Int(Double.random(in: 1.0 ..< 2.0) * Double(readBufferSize)) + 1
885878
case .medium:
886-
length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize))
879+
length = Int(Double.random(in: 0.2 ..< 1.0) * Double(readBufferSize)) + 1
887880
case .small:
888-
length = Int.random(in: 0 ..< 16)
881+
length = Int.random(in: 1 ..< 16)
889882
}
890883

891884
var buffer: [UInt8] = Array(repeating: 0, count: length)
892885
for index in 0 ..< length {
893886
buffer[index] = UInt8.random(in: range)
894887
}
888+
// Buffer cannot be empty or a line with a \r ending followed by an empty one with a \n ending would be indistinguishable.
889+
// This matters for any line ending sequences where one line ending sequence is the prefix of another. \r and \r\n are the
890+
// only two which meet this criteria.
891+
precondition(!buffer.isEmpty)
895892
return buffer
896893
}
897894

@@ -954,6 +951,8 @@ extension SubprocessUnixTests {
954951
) { execution, standardOutput in
955952
var index = 0
956953
for try await line in standardOutput.lines(encoding: UTF8.self) {
954+
defer { index += 1 }
955+
try #require(index < testCases.count, "Received more lines than expected")
957956
#expect(
958957
line == testCases[index].value,
959958
"""
@@ -963,7 +962,6 @@ extension SubprocessUnixTests {
963962
Line Ending \(Array(testCases[index].newLine.utf8))
964963
"""
965964
)
966-
index += 1
967965
}
968966
}
969967
try FileManager.default.removeItem(at: testFilePath)

0 commit comments

Comments
 (0)