Skip to content

Commit 693fe9e

Browse files
committed
Update S3ASeekableStream Implementation to throw when stream is closed
1 parent 2d23c5b commit 693fe9e

File tree

1 file changed

+29
-12
lines changed

1 file changed

+29
-12
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44

5+
import org.apache.hadoop.fs.FSExceptionMessages;
56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
78

@@ -14,48 +15,46 @@
1415
public class S3ASeekableStream extends FSInputStream {
1516

1617
private S3SeekableInputStream inputStream;
18+
private long lastReadCurrentPos = 0;
1719
private final String key;
1820

1921
public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);
2022

21-
22-
public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory)
23-
throws IOException {
23+
public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
2424
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
2525
this.key = key;
2626
}
2727

2828
@Override
2929
public int read() throws IOException {
30+
throwIfClosed();
3031
return inputStream.read();
3132
}
3233

3334
@Override
3435
public void seek(long pos) throws IOException {
36+
throwIfClosed();
3537
inputStream.seek(pos);
3638
}
3739

38-
@Override
39-
public long getPos() throws IOException {
40-
return inputStream.getPos();
41-
}
4240

4341
@Override
44-
public void close() throws IOException {
45-
if (inputStream != null) {
46-
inputStream.close();
47-
inputStream = null;
48-
super.close();
42+
public synchronized long getPos() {
43+
if (!isClosed()) {
44+
lastReadCurrentPos = inputStream.getPos();
4945
}
46+
return lastReadCurrentPos;
5047
}
5148

5249

5350
public void readTail(byte[] buf, int off, int n) throws IOException {
51+
throwIfClosed();
5452
inputStream.readTail(buf, off, n);
5553
}
5654

5755
@Override
5856
public int read(byte[] buf, int off, int len) throws IOException {
57+
throwIfClosed();
5958
return inputStream.read(buf, off, len);
6059
}
6160

@@ -65,4 +64,22 @@ public boolean seekToNewSource(long l) throws IOException {
6564
return false;
6665
}
6766

67+
@Override
68+
public void close() throws IOException {
69+
if (inputStream != null) {
70+
inputStream.close();
71+
inputStream = null;
72+
super.close();
73+
}
74+
}
75+
76+
protected void throwIfClosed() throws IOException {
77+
if (isClosed()) {
78+
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
79+
}
80+
}
81+
82+
protected boolean isClosed() {
83+
return inputStream == null;
84+
}
6885
}

0 commit comments

Comments
 (0)