diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 2e87f916eb6..b7ded9201ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.crypto; +import java.io.EOFException; import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FilterInputStream; @@ -34,6 +35,7 @@ import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.fs.PositionedReadable; @@ -395,7 +397,9 @@ public void readFully(long position, byte[] buffer) throws IOException { /** Seek to a position. */ @Override public void seek(long pos) throws IOException { - Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset."); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + } checkStream(); try { /* diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 0e78b653c73..641fdc25191 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -182,20 +182,18 @@ public int available() throws IOException { public int read(long position, byte[] b, int off, int len) throws IOException { // parameter check - if ((off | len | (off + len) | (b.length - (off + len))) < 0) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { + validatePositionedReadArgs(position, b, off, len); + if (len == 0) { return 0; } - if( position<0 ) { - throw new IllegalArgumentException( - "Parameter position can not to be negative"); - } - ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); - checker.seek(position); - int nread = checker.read(b, off, len); - checker.close(); + int nread; + try (ChecksumFSInputChecker checker = + new ChecksumFSInputChecker(fs, file)) { + checker.seek(position); + nread = checker.read(b, off, len); + checker.close(); + } return nread; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java index 7dc4a809cd9..e5cecf65c54 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java @@ -164,28 +164,26 @@ protected long getChunkPosition(long dataPos) { public int available() throws IOException { return datas.available() + super.available(); } - + @Override public int read(long position, byte[] b, int off, int len) throws IOException, UnresolvedLinkException { // parameter check - if ((off | len | (off + len) | (b.length - (off + len))) < 0) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { + validatePositionedReadArgs(position, b, off, len); + if (len == 0) { return 0; } - if (position<0) { - throw new IllegalArgumentException( - "Parameter position can not to be negative"); - } - ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); - checker.seek(position); - int nread = checker.read(b, off, len); - checker.close(); + int nread; + try (ChecksumFSInputChecker checker = + new ChecksumFSInputChecker(fs, file)) { + checker.seek(position); + nread = checker.read(b, off, len); + checker.close(); + } return nread; } - + @Override public void close() throws IOException { datas.close(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 477bd6f47ee..da987692af0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -18,18 +18,21 @@ */ package org.apache.hadoop.fs; -import java.io.*; +import java.io.DataInputStream; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.util.IdentityHashStore; /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} - * and buffers input through a {@link BufferedInputStream}. */ + * and buffers input through a {@link java.io.BufferedInputStream}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FSDataInputStream extends DataInputStream @@ -97,6 +100,7 @@ public int read(long position, byte[] buffer, int offset, int length) * @param buffer buffer into which data is read * @param offset offset into the buffer in which data is written * @param length the number of bytes to read + * @throws IOException IO problems * @throws EOFException If the end of stream is reached while reading. * If an exception is thrown an undetermined number * of bytes in the buffer may have been written. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java index b80fb30f94b..95724ffc877 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSExceptionMessages.java @@ -40,4 +40,10 @@ public class FSExceptionMessages { */ public static final String CANNOT_SEEK_PAST_EOF = "Attempted to seek or read past the end of the file"; + + public static final String EOF_IN_READ_FULLY = + "End of file reached before reading fully."; + + public static final String TOO_MANY_BYTES_FOR_DEST_BUFFER + = "Requested more bytes than destination buffer size"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java index 148e6745f60..64fbb45ea55 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java @@ -17,22 +17,28 @@ */ package org.apache.hadoop.fs; -import java.io.*; -import java.nio.ByteBuffer; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.ZeroCopyUnavailableException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /**************************************************************** * FSInputStream is a generic old InputStream with a little bit * of RAF-style seek ability. * *****************************************************************/ -@InterfaceAudience.LimitedPrivate({"HDFS"}) -@InterfaceStability.Unstable +@InterfaceAudience.Public +@InterfaceStability.Evolving public abstract class FSInputStream extends InputStream implements Seekable, PositionedReadable { + private static final Logger LOG = + LoggerFactory.getLogger(FSInputStream.class); + /** * Seek to the given offset from the start of the file. * The next read() will be from that location. Can't @@ -57,32 +63,69 @@ public abstract class FSInputStream extends InputStream @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } synchronized (this) { long oldPos = getPos(); int nread = -1; try { seek(position); nread = read(buffer, offset, length); + } catch (EOFException e) { + // end of file; this can be raised by some filesystems + // (often: object stores); it is swallowed here. + LOG.debug("Downgrading EOFException raised trying to" + + " read {} bytes at offset {}", length, offset, e); } finally { seek(oldPos); } return nread; } } - + + /** + * Validation code, available for use in subclasses. + * @param position position: if negative an EOF exception is raised + * @param buffer destination buffer + * @param offset offset within the buffer + * @param length length of bytes to read + * @throws EOFException if the position is negative + * @throws IndexOutOfBoundsException if there isn't space for the amount of + * data requested. + * @throws IllegalArgumentException other arguments are invalid. + */ + protected void validatePositionedReadArgs(long position, + byte[] buffer, int offset, int length) throws EOFException { + Preconditions.checkArgument(length >= 0, "length is negative"); + if (position < 0) { + throw new EOFException("position is negative"); + } + Preconditions.checkArgument(buffer != null, "Null buffer"); + if (buffer.length - offset < length) { + throw new IndexOutOfBoundsException( + FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER); + } + } + @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); int nread = 0; while (nread < length) { - int nbytes = read(position+nread, buffer, offset+nread, length-nread); + int nbytes = read(position + nread, + buffer, + offset + nread, + length - nread); if (nbytes < 0) { - throw new EOFException("End of file reached before reading fully."); + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } nread += nbytes; } } - + @Override public void readFully(long position, byte[] buffer) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 8a16e6a5f71..3a2da26695c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -26,6 +26,7 @@ import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.Progressable; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -1053,16 +1054,15 @@ public int read(long pos, byte[] b, int offset, int length) @Override public void readFully(long pos, byte[] b, int offset, int length) throws IOException { + validatePositionedReadArgs(pos, b, offset, length); + if (length == 0) { + return; + } if (start + length + pos > end) { - throw new IOException("Not enough bytes to read."); + throw new EOFException("Not enough bytes to read."); } underLyingStream.readFully(pos + start, b, offset, length); } - - @Override - public void readFully(long pos, byte[] b) throws IOException { - readFully(pos, b, 0, b.length); - } @Override public void setReadahead(Long readahead) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java index a2384cd8b0b..6744d17a726 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -22,30 +22,67 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -/** Stream that permits positional reading. */ +/** + * Stream that permits positional reading. + * + * Implementations are required to implement thread-safe operations; this may + * be supported by concurrent access to the data, or by using a synchronization + * mechanism to serialize access. + * + * Not all implementations meet this requirement. Those that do not cannot + * be used as a backing store for some applications, such as Apache HBase. + * + * Independent of whether or not they are thread safe, some implementations + * may make the intermediate state of the system, specifically the position + * obtained in {@code Seekable.getPos()} visible. + */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface PositionedReadable { /** - * Read upto the specified number of bytes, from a given + * Read up to the specified number of bytes, from a given * position within a file, and return the number of bytes read. This does not * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @param offset offset in the buffer + * @param length number of bytes to read + * @return actual number of bytes read; -1 means "none" + * @throws IOException IO problems. */ - public int read(long position, byte[] buffer, int offset, int length) + int read(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read the specified number of bytes, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @param offset offset in the buffer + * @param length number of bytes to read + * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed */ - public void readFully(long position, byte[] buffer, int offset, int length) + void readFully(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read number of bytes equal to the length of the buffer, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed */ - public void readFully(long position, byte[] buffer) throws IOException; + void readFully(long position, byte[] buffer) throws IOException; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 318bbb063ff..7e6a2662062 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -157,6 +157,8 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { + // parameter check + validatePositionedReadArgs(position, b, off, len); try { int value = fis.read(b, off, len); if (value > 0) { @@ -172,6 +174,12 @@ public int read(byte[] b, int off, int len) throws IOException { @Override public int read(long position, byte[] b, int off, int len) throws IOException { + // parameter check + validatePositionedReadArgs(position, b, off, len); + if (len == 0) { + return 0; + } + ByteBuffer bb = ByteBuffer.wrap(b, off, len); try { int value = fis.getChannel().read(bb, position); diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 4b4318ef3f7..de6b861a66c 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -279,6 +279,9 @@ on the underlying stream: read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1] +Note that implementations are not required to be atomic; the intermediate state +of the operation (the change in the value of `getPos()`) may be visible. + #### Implementation preconditions Not all `FSDataInputStream` implementations support these operations. Those that do @@ -287,7 +290,7 @@ interface. supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException] -This could be considered obvious: if a stream is not Seekable, a client +This could be considered obvious: if a stream is not `Seekable`, a client cannot seek to a location. It is also a side effect of the base class implementation, which uses `Seekable.seek()`. @@ -304,14 +307,14 @@ For any operations that fail, the contents of the destination `buffer` are undefined. Implementations may overwrite part or all of the buffer before reporting a failure. - - ### `int PositionedReadable.read(position, buffer, offset, length)` +Read as much data as possible into the buffer space allocated for it. + #### Preconditions - position > 0 else raise [IllegalArgumentException, RuntimeException] - len(buffer) + offset < len(data) else raise [IndexOutOfBoundException, RuntimeException] + position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] + len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0 @@ -324,23 +327,36 @@ of data available from the specified position: buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available +1. A return value of -1 means that the stream had no more available data. +1. An invocation with `length==0` implicitly does not read any data; +implementations may short-cut the operation and omit any IO. In such instances, +checks for the stream being at the end of the file may be omitted. +1. If an IO exception occurs during the read operation(s), +the final state of `buffer` is undefined. ### `void PositionedReadable.readFully(position, buffer, offset, length)` +Read exactly `length` bytes of data into the buffer, failing if there is not +enough data available. + #### Preconditions - position > 0 else raise [IllegalArgumentException, RuntimeException] + position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 + len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException] - len(buffer) + offset < len(data) + +If an IO exception occurs during the read operation(s), +the final state of `buffer` is undefined. + +If there is not enough data in the input stream to satisfy the requests, +the final state of `buffer` is undefined. #### Postconditions -The amount of data read is the less of the length or the amount -of data available from the specified position: +The buffer from offset `offset` is filled with the data starting at `position` - let available = min(length, len(data)-position) buffer'[offset..(offset+length-1)] = data[position..(position + length -1)] ### `PositionedReadable.readFully(position, buffer)` @@ -349,6 +365,9 @@ The semantics of this are exactly equivalent to readFully(position, buffer, 0, len(buffer)) +That is, the buffer is filled entirely with the contents of the input source +from position `position` + ## Consistency diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java index 86bb64d882c..f9c8c165edd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.crypto; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -29,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.ReadOption; @@ -339,7 +341,7 @@ private void readFullyCheck(InputStream in, int pos) throws Exception { try { ((PositionedReadable) in).readFully(pos, result); Assert.fail("Read fully exceeds maximum length should fail."); - } catch (IOException e) { + } catch (EOFException e) { } } @@ -365,9 +367,9 @@ public void testSeek() throws Exception { try { seekCheck(in, -3); Assert.fail("Seek to negative offset should fail."); - } catch (IllegalArgumentException e) { - GenericTestUtils.assertExceptionContains("Cannot seek to negative " + - "offset", e); + } catch (EOFException e) { + GenericTestUtils.assertExceptionContains( + FSExceptionMessages.NEGATIVE_SEEK, e); } Assert.assertEquals(pos, ((Seekable) in).getPos()); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractAppendTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractAppendTest.java index d3e674174d1..6b3e98bd95a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractAppendTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractAppendTest.java @@ -52,11 +52,8 @@ public void setup() throws Exception { public void testAppendToEmptyFile() throws Throwable { touch(getFileSystem(), target); byte[] dataset = dataset(256, 'a', 'z'); - FSDataOutputStream outputStream = getFileSystem().append(target); - try { + try (FSDataOutputStream outputStream = getFileSystem().append(target)) { outputStream.write(dataset); - } finally { - outputStream.close(); } byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target, dataset.length); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractConcatTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractConcatTest.java index 69e902b1b0e..7b120861edc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractConcatTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractConcatTest.java @@ -53,7 +53,7 @@ public void setup() throws Exception { target = new Path(testPath, "target"); byte[] block = dataset(TEST_FILE_LEN, 0, 255); - createFile(getFileSystem(), srcFile, false, block); + createFile(getFileSystem(), srcFile, true, block); touch(getFileSystem(), zeroByteFile); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java index f42ab781873..9344225d175 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCreateTest.java @@ -123,7 +123,7 @@ public void testOverwriteNonEmptyDirectory() throws Throwable { } catch (AssertionError failure) { if (isSupported(IS_BLOBSTORE)) { // file/directory hack surfaces here - throw new AssumptionViolatedException(failure.toString()).initCause(failure); + throw new AssumptionViolatedException(failure.toString(), failure); } // else: rethrow throw failure; @@ -163,13 +163,11 @@ public void testOverwriteNonEmptyDirectory() throws Throwable { public void testCreatedFileIsImmediatelyVisible() throws Throwable { describe("verify that a newly created file exists as soon as open returns"); Path path = path("testCreatedFileIsImmediatelyVisible"); - FSDataOutputStream out = null; - try { - out = getFileSystem().create(path, + try(FSDataOutputStream out = getFileSystem().create(path, false, 4096, (short) 1, - 1024); + 1024)) { if (!getFileSystem().exists(path)) { if (isSupported(IS_BLOBSTORE)) { @@ -180,8 +178,6 @@ public void testCreatedFileIsImmediatelyVisible() throws Throwable { assertPathExists("expected path to be visible before anything written", path); } - } finally { - IOUtils.closeStream(out); } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractDeleteTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractDeleteTest.java index 2bd60ca3731..6809fb339b5 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractDeleteTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractDeleteTest.java @@ -47,7 +47,7 @@ public void testDeleteEmptyDirRecursive() throws Throwable { @Test public void testDeleteNonexistentPathRecursive() throws Throwable { Path path = path("testDeleteNonexistentPathRecursive"); - ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path); + assertPathDoesNotExist("leftover", path); ContractTestUtils.rejectRootOperation(path); assertFalse("Returned true attempting to delete" + " a nonexistent path " + path, @@ -58,7 +58,7 @@ public void testDeleteNonexistentPathRecursive() throws Throwable { @Test public void testDeleteNonexistentPathNonRecursive() throws Throwable { Path path = path("testDeleteNonexistentPathNonRecursive"); - ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path); + assertPathDoesNotExist("leftover", path); ContractTestUtils.rejectRootOperation(path); assertFalse("Returned true attempting to recursively delete" + " a nonexistent path " + path, @@ -81,7 +81,7 @@ public void testDeleteNonEmptyDirNonRecursive() throws Throwable { //expected handleExpectedException(expected); } - ContractTestUtils.assertIsDirectory(getFileSystem(), path); + assertIsDirectory(path); } @Test @@ -92,7 +92,7 @@ public void testDeleteNonEmptyDirRecursive() throws Throwable { ContractTestUtils.writeTextFile(getFileSystem(), file, "goodbye, world", true); assertDeleted(path, true); - ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file); + assertPathDoesNotExist("not deleted", file); } @Test @@ -100,12 +100,11 @@ public void testDeleteDeepEmptyDir() throws Throwable { mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4")); assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true); - FileSystem fs = getFileSystem(); - ContractTestUtils.assertPathDoesNotExist(fs, + assertPathDoesNotExist( "not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4")); - ContractTestUtils.assertPathDoesNotExist(fs, + assertPathDoesNotExist( "not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3")); - ContractTestUtils.assertPathExists(fs, "parent dir is deleted", + assertPathExists( "parent dir is deleted", path("testDeleteDeepEmptyDir/d1/d2")); } @@ -117,8 +116,7 @@ public void testDeleteSingleFile() throws Throwable { Path file = new Path(path, "childfile"); ContractTestUtils.writeTextFile(getFileSystem(), file, "single file to be deleted.", true); - ContractTestUtils.assertPathExists(getFileSystem(), - "single file not created", file); + assertPathExists("single file not created", file); assertDeleted(file, false); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMkdirTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMkdirTest.java index 86fd61f72b2..427b0e972d2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMkdirTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMkdirTest.java @@ -67,12 +67,9 @@ public void testNoMkdirOverFile() throws Throwable { boolean made = fs.mkdirs(path); fail("mkdirs did not fail over a file but returned " + made + "; " + ls(path)); - } catch (ParentNotDirectoryException e) { + } catch (ParentNotDirectoryException | FileAlreadyExistsException e) { //parent is a directory handleExpectedException(e); - } catch (FileAlreadyExistsException e) { - //also allowed as an exception (HDFS) - handleExpectedException(e);; } catch (IOException e) { //here the FS says "no create" handleRelaxedException("mkdirs", "FileAlreadyExistsException", e); @@ -97,11 +94,9 @@ public void testMkdirOverParentFile() throws Throwable { boolean made = fs.mkdirs(child); fail("mkdirs did not fail over a file but returned " + made + "; " + ls(path)); - } catch (ParentNotDirectoryException e) { + } catch (ParentNotDirectoryException | FileAlreadyExistsException e) { //parent is a directory handleExpectedException(e); - } catch (FileAlreadyExistsException e) { - handleExpectedException(e); } catch (IOException e) { handleRelaxedException("mkdirs", "ParentNotDirectoryException", e); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java index cbbb27e91eb..f9b16f47949 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractOpenTest.java @@ -125,10 +125,10 @@ public void testOpenFileTwice() throws Throwable { createFile(getFileSystem(), path, false, block); //open first FSDataInputStream instream1 = getFileSystem().open(path); - int c = instream1.read(); - assertEquals(0,c); FSDataInputStream instream2 = null; try { + int c = instream1.read(); + assertEquals(0,c); instream2 = getFileSystem().open(path); assertEquals("first read of instream 2", 0, instream2.read()); assertEquals("second read of instream 1", 1, instream1.read()); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java index 04c444de8d8..b0dcb936c7c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java @@ -26,8 +26,7 @@ import java.io.FileNotFoundException; import java.io.IOException; -import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; -import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; /** * Test creating files, overwrite options &c @@ -46,9 +45,9 @@ public void testRenameNewFileSameDir() throws Throwable { boolean rename = rename(renameSrc, renameTarget); assertTrue("rename("+renameSrc+", "+ renameTarget+") returned false", rename); - ContractTestUtils.assertListStatusFinds(getFileSystem(), + assertListStatusFinds(getFileSystem(), renameTarget.getParent(), renameTarget); - ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data); + verifyFileContents(getFileSystem(), renameTarget, data); } @Test @@ -129,7 +128,7 @@ public void testRenameFileOverExistingFile() throws Throwable { } // verify that the destination file is as expected based on the expected // outcome - ContractTestUtils.verifyFileContents(getFileSystem(), destFile, + verifyFileContents(getFileSystem(), destFile, destUnchanged? destData: srcData); } @@ -154,7 +153,7 @@ public void testRenameDirIntoExistingDir() throws Throwable { Path renamedSrc = new Path(destDir, sourceSubdir); assertIsFile(destFilePath); assertIsDirectory(renamedSrc); - ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset); + verifyFileContents(fs, destFilePath, destDateset); assertTrue("rename returned false though the contents were copied", rename); } @@ -172,10 +171,10 @@ public void testRenameFileNonexistentDir() throws Throwable { boolean rename = rename(renameSrc, renameTarget); if (renameCreatesDestDirs) { assertTrue(rename); - ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data); + verifyFileContents(getFileSystem(), renameTarget, data); } else { assertFalse(rename); - ContractTestUtils.verifyFileContents(getFileSystem(), renameSrc, data); + verifyFileContents(getFileSystem(), renameSrc, data); } } catch (FileNotFoundException e) { // allowed unless that rename flag is set @@ -191,36 +190,36 @@ public void testRenameWithNonEmptySubDir() throws Throwable { final Path finalDir = new Path(renameTestDir, "dest"); FileSystem fs = getFileSystem(); boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR); - ContractTestUtils.rm(fs, renameTestDir, true, false); + rm(fs, renameTestDir, true, false); fs.mkdirs(srcDir); fs.mkdirs(finalDir); - ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"), + writeTextFile(fs, new Path(srcDir, "source.txt"), "this is the file in src dir", false); - ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"), + writeTextFile(fs, new Path(srcSubDir, "subfile.txt"), "this is the file in src/sub dir", false); - ContractTestUtils.assertPathExists(fs, "not created in src dir", + assertPathExists("not created in src dir", new Path(srcDir, "source.txt")); - ContractTestUtils.assertPathExists(fs, "not created in src/sub dir", + assertPathExists("not created in src/sub dir", new Path(srcSubDir, "subfile.txt")); fs.rename(srcDir, finalDir); // Accept both POSIX rename behavior and CLI rename behavior if (renameRemoveEmptyDest) { // POSIX rename behavior - ContractTestUtils.assertPathExists(fs, "not renamed into dest dir", + assertPathExists("not renamed into dest dir", new Path(finalDir, "source.txt")); - ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir", + assertPathExists("not renamed into dest/sub dir", new Path(finalDir, "sub/subfile.txt")); } else { // CLI rename behavior - ContractTestUtils.assertPathExists(fs, "not renamed into dest dir", + assertPathExists("not renamed into dest dir", new Path(finalDir, "src1/source.txt")); - ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir", + assertPathExists("not renamed into dest/sub dir", new Path(finalDir, "src1/sub/subfile.txt")); } - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted", + assertPathDoesNotExist("not deleted", new Path(srcDir, "source.txt")); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java index fb1455e618f..7273945efc6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java @@ -51,7 +51,7 @@ public void testMkDirDepth1() throws Throwable { Path dir = new Path("/testmkdirdepth1"); assertPathDoesNotExist("directory already exists", dir); fs.mkdirs(dir); - ContractTestUtils.assertIsDirectory(getFileSystem(), dir); + assertIsDirectory(dir); assertPathExists("directory already exists", dir); assertDeleted(dir, true); } @@ -61,10 +61,10 @@ public void testRmEmptyRootDirNonRecursive() throws Throwable { //extra sanity checks here to avoid support calls about complete loss of data skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); Path root = new Path("/"); - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); boolean deleted = getFileSystem().delete(root, true); LOG.info("rm / of empty dir result is {}", deleted); - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); } @Test @@ -75,7 +75,7 @@ public void testRmNonEmptyRootDirNonRecursive() throws Throwable { String touchfile = "/testRmNonEmptyRootDirNonRecursive"; Path file = new Path(touchfile); ContractTestUtils.touch(getFileSystem(), file); - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); try { boolean deleted = getFileSystem().delete(root, false); fail("non recursive delete should have raised an exception," + @@ -86,7 +86,7 @@ public void testRmNonEmptyRootDirNonRecursive() throws Throwable { } finally { getFileSystem().delete(file, false); } - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); } @Test @@ -94,11 +94,11 @@ public void testRmRootRecursive() throws Throwable { //extra sanity checks here to avoid support calls about complete loss of data skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); Path root = new Path("/"); - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); Path file = new Path("/testRmRootRecursive"); ContractTestUtils.touch(getFileSystem(), file); boolean deleted = getFileSystem().delete(root, true); - ContractTestUtils.assertIsDirectory(getFileSystem(), root); + assertIsDirectory(root); LOG.info("rm -rf / result is {}", deleted); if (deleted) { assertPathDoesNotExist("expected file to be deleted", file); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java index 8f5651021b2..f1ca8cb8d5a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Test; @@ -31,9 +32,9 @@ import java.io.IOException; import java.util.Random; -import static org.apache.hadoop.fs.contract.ContractTestUtils.cleanup; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead; @@ -46,7 +47,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas public static final int DEFAULT_RANDOM_SEEK_COUNT = 100; - private Path testPath; private Path smallSeekFile; private Path zeroByteFile; private FSDataInputStream instream; @@ -56,13 +56,13 @@ public void setup() throws Exception { super.setup(); skipIfUnsupported(SUPPORTS_SEEK); //delete the test directory - testPath = getContract().getTestPath(); smallSeekFile = path("seekfile.txt"); zeroByteFile = path("zero.txt"); byte[] block = dataset(TEST_FILE_LEN, 0, 255); //this file now has a simple rule: offset => value - createFile(getFileSystem(), smallSeekFile, false, block); - touch(getFileSystem(), zeroByteFile); + FileSystem fs = getFileSystem(); + createFile(fs, smallSeekFile, true, block); + touch(fs, zeroByteFile); } @Override @@ -79,6 +79,21 @@ public void teardown() throws Exception { super.teardown(); } + /** + * Skip a test case if the FS doesn't support positioned readable. + * This should hold automatically if the FS supports seek, even + * if it doesn't support seeking past the EOF. + * And, because this test suite requires seek to be supported, the + * feature is automatically assumed to be true unless stated otherwise. + */ + protected void assumeSupportsPositionedReadable() throws IOException { + // because this , + if (!getContract().isSupported(SUPPORTS_POSITIONED_READABLE, true)) { + skip("Skipping as unsupported feature: " + + SUPPORTS_POSITIONED_READABLE); + } + } + @Test public void testSeekZeroByteFile() throws Throwable { describe("seek and read a 0 byte file"); @@ -282,6 +297,7 @@ public void testSeekBigFile() throws Throwable { public void testPositionedBulkReadDoesntChangePosition() throws Throwable { describe( "verify that a positioned read does not change the getPos() value"); + assumeSupportsPositionedReadable(); Path testSeekFile = path("bigseekfile.txt"); byte[] block = dataset(65536, 0, 255); createFile(getFileSystem(), testSeekFile, false, block); @@ -290,8 +306,9 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable { assertTrue(-1 != instream.read()); assertEquals(40000, instream.getPos()); - byte[] readBuffer = new byte[256]; - instream.read(128, readBuffer, 0, readBuffer.length); + int v = 256; + byte[] readBuffer = new byte[v]; + assertEquals(v, instream.read(128, readBuffer, 0, v)); //have gone back assertEquals(40000, instream.getPos()); //content is the same too @@ -317,12 +334,11 @@ public void testRandomSeeks() throws Throwable { Path randomSeekFile = path("testrandomseeks.bin"); createFile(getFileSystem(), randomSeekFile, false, buf); Random r = new Random(); - FSDataInputStream stm = getFileSystem().open(randomSeekFile); // Record the sequence of seeks and reads which trigger a failure. int[] seeks = new int[10]; int[] reads = new int[10]; - try { + try (FSDataInputStream stm = getFileSystem().open(randomSeekFile)) { for (int i = 0; i < limit; i++) { int seekOff = r.nextInt(buf.length); int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000)); @@ -336,13 +352,232 @@ public void testRandomSeeks() throws Throwable { sb.append("Sequence of actions:\n"); for (int j = 0; j < seeks.length; j++) { sb.append("seek @ ").append(seeks[j]).append(" ") - .append("read ").append(reads[j]).append("\n"); + .append("read ").append(reads[j]).append("\n"); } LOG.error(sb.toString()); throw afe; - } finally { - stm.close(); } } + @Test + public void testReadFullyZeroByteFile() throws Throwable { + describe("readFully against a 0 byte file"); + assumeSupportsPositionedReadable(); + instream = getFileSystem().open(zeroByteFile); + assertEquals(0, instream.getPos()); + byte[] buffer = new byte[1]; + instream.readFully(0, buffer, 0, 0); + assertEquals(0, instream.getPos()); + // seek to 0 read 0 bytes from it + instream.seek(0); + assertEquals(0, instream.read(buffer, 0, 0)); + } + + @Test + public void testReadFullyPastEOFZeroByteFile() throws Throwable { + assumeSupportsPositionedReadable(); + describe("readFully past the EOF of a 0 byte file"); + instream = getFileSystem().open(zeroByteFile); + byte[] buffer = new byte[1]; + // try to read past end of file + try { + instream.readFully(0, buffer, 0, 16); + fail("Expected an exception"); + } catch (IllegalArgumentException | IndexOutOfBoundsException + | EOFException e) { + // expected + } + } + + @Test + public void testReadFullySmallFile() throws Throwable { + describe("readFully operations"); + assumeSupportsPositionedReadable(); + instream = getFileSystem().open(smallSeekFile); + byte[] buffer = new byte[256]; + // expect negative length to fail + try { + instream.readFully(0, buffer, 0, -16); + fail("Expected an exception"); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + // negative offset into buffer + try { + instream.readFully(0, buffer, -1, 16); + fail("Expected an exception"); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + // expect negative position to fail, ideally with EOF + try { + instream.readFully(-1, buffer); + fail("Expected an exception"); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException |IllegalArgumentException | IndexOutOfBoundsException e) { + handleRelaxedException("readFully with a negative position ", + "EOFException", + e); + } + + // read more than the offset allows + try { + instream.readFully(0, buffer, buffer.length - 8, 16); + fail("Expected an exception"); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + + // read properly + assertEquals(0, instream.getPos()); + instream.readFully(0, buffer); + assertEquals(0, instream.getPos()); + + // now read the entire file in one go + byte[] fullFile = new byte[TEST_FILE_LEN]; + instream.readFully(0, fullFile); + assertEquals(0, instream.getPos()); + + try { + instream.readFully(16, fullFile); + fail("Expected an exception"); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("readFully which reads past EOF ", + "EOFException", + e); + } + } + + @Test + public void testReadFullyPastEOF() throws Throwable { + describe("readFully past the EOF of a file"); + assumeSupportsPositionedReadable(); + instream = getFileSystem().open(smallSeekFile); + byte[] buffer = new byte[256]; + + // now read past the end of the file + try { + instream.readFully(TEST_FILE_LEN + 1, buffer); + fail("Expected an exception"); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("readFully with an offset past EOF ", + "EOFException", + e); + } + // read zero bytes from an offset past EOF. + try { + instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0); + // a zero byte read may fail-fast + LOG.info("Filesystem short-circuits 0-byte reads"); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("readFully(0 bytes) with an offset past EOF ", + "EOFException", + e); + } + } + + @Test + public void testReadFullyZeroBytebufferPastEOF() throws Throwable { + describe("readFully zero bytes from an offset past EOF"); + assumeSupportsPositionedReadable(); + instream = getFileSystem().open(smallSeekFile); + byte[] buffer = new byte[256]; + try { + instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0); + // a zero byte read may fail-fast + LOG.info("Filesystem short-circuits 0-byte reads"); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException e) { + handleRelaxedException("readFully(0 bytes) with an offset past EOF ", + "EOFException", + e); + } + } + + @Test + public void testReadNullBuffer() throws Throwable { + describe("try to read a null buffer "); + assumeSupportsPositionedReadable(); + try (FSDataInputStream in = getFileSystem().open(smallSeekFile)) { + // Null buffer + int r = in.read(0, null, 0, 16); + fail("Expected an exception from a read into a null buffer, got " + r); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testReadSmallFile() throws Throwable { + describe("PositionedRead.read operations"); + assumeSupportsPositionedReadable(); + instream = getFileSystem().open(smallSeekFile); + byte[] buffer = new byte[256]; + int r; + // expect negative length to fail + try { + r = instream.read(0, buffer, 0, -16); + fail("Expected an exception, got " + r); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + // negative offset into buffer + try { + r = instream.read(0, buffer, -1, 16); + fail("Expected an exception, got " + r); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + // negative position + try { + r = instream.read(-1, buffer, 0, 16); + fail("Expected an exception, got " + r); + } catch (EOFException e) { + handleExpectedException(e); + } catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) { + handleRelaxedException("read() with a negative position ", + "EOFException", + e); + } + + // read more than the offset allows + try { + r = instream.read(0, buffer, buffer.length - 8, 16); + fail("Expected an exception, got " + r); + } catch (IllegalArgumentException | IndexOutOfBoundsException e) { + // expected + } + + // read properly + assertEquals(0, instream.getPos()); + instream.readFully(0, buffer); + assertEquals(0, instream.getPos()); + + // now read the entire file in one go + byte[] fullFile = new byte[TEST_FILE_LEN]; + assertEquals(TEST_FILE_LEN, + instream.read(0, fullFile, 0, fullFile.length)); + assertEquals(0, instream.getPos()); + + // now read past the end of the file + assertEquals(-1, + instream.read(TEST_FILE_LEN + 16, buffer, 0, 1)); + } + + @Test + public void testReadAtExactEOF() throws Throwable { + describe("read at the end of the file"); + instream = getFileSystem().open(smallSeekFile); + instream.seek(TEST_FILE_LEN -1); + assertTrue("read at last byte", instream.read() > 0); + assertEquals("read just past EOF", -1, instream.read()); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java index a000ec8ed55..03bf2aa7e76 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractFSContractTestBase.java @@ -57,7 +57,7 @@ public abstract class AbstractFSContractTestBase extends Assert public static final int DEFAULT_TEST_TIMEOUT = 180 * 1000; /** - * The FS contract used for these tets + * The FS contract used for these tests */ private AbstractFSContract contract; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java index d8c259265fb..c8af0625858 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractOptions.java @@ -53,20 +53,20 @@ public interface ContractOptions { /** * Flag to indicate that the FS can rename into directories that * don't exist, creating them as needed. - * @{value} + * {@value} */ String RENAME_CREATES_DEST_DIRS = "rename-creates-dest-dirs"; /** * Flag to indicate that the FS does not follow the rename contract -and * instead only returns false on a failure. - * @{value} + * {@value} */ String RENAME_OVERWRITES_DEST = "rename-overwrites-dest"; /** * Flag to indicate that the FS returns false if the destination exists - * @{value} + * {@value} */ String RENAME_RETURNS_FALSE_IF_DEST_EXISTS = "rename-returns-false-if-dest-exists"; @@ -74,7 +74,7 @@ public interface ContractOptions { /** * Flag to indicate that the FS returns false on a rename * if the source is missing - * @{value} + * {@value} */ String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING = "rename-returns-false-if-source-missing"; @@ -82,74 +82,74 @@ public interface ContractOptions { /** * Flag to indicate that the FS remove dest first if it is an empty directory * mean the FS honors POSIX rename behavior. - * @{value} + * {@value} */ String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir"; /** * Flag to indicate that append is supported - * @{value} + * {@value} */ String SUPPORTS_APPEND = "supports-append"; /** * Flag to indicate that setTimes is supported. - * @{value} + * {@value} */ String SUPPORTS_SETTIMES = "supports-settimes"; /** * Flag to indicate that getFileStatus is supported. - * @{value} + * {@value} */ String SUPPORTS_GETFILESTATUS = "supports-getfilestatus"; /** * Flag to indicate that renames are atomic - * @{value} + * {@value} */ String SUPPORTS_ATOMIC_RENAME = "supports-atomic-rename"; /** * Flag to indicate that directory deletes are atomic - * @{value} + * {@value} */ String SUPPORTS_ATOMIC_DIRECTORY_DELETE = "supports-atomic-directory-delete"; /** * Does the FS support multiple block locations? - * @{value} + * {@value} */ String SUPPORTS_BLOCK_LOCALITY = "supports-block-locality"; /** * Does the FS support the concat() operation? - * @{value} + * {@value} */ String SUPPORTS_CONCAT = "supports-concat"; /** * Is seeking supported at all? - * @{value} + * {@value} */ String SUPPORTS_SEEK = "supports-seek"; /** * Is seeking past the EOF allowed? - * @{value} + * {@value} */ String REJECTS_SEEK_PAST_EOF = "rejects-seek-past-eof"; /** * Is seeking on a closed file supported? Some filesystems only raise an * exception later, when trying to read. - * @{value} + * {@value} */ String SUPPORTS_SEEK_ON_CLOSED_FILE = "supports-seek-on-closed-file"; /** * Is available() on a closed InputStream supported? - * @{value} + * {@value} */ String SUPPORTS_AVAILABLE_ON_CLOSED_FILE = "supports-available-on-closed-file"; @@ -157,32 +157,39 @@ public interface ContractOptions { * Flag to indicate that this FS expects to throw the strictest * exceptions it can, not generic IOEs, which, if returned, * must be rejected. - * @{value} + * {@value} */ String SUPPORTS_STRICT_EXCEPTIONS = "supports-strict-exceptions"; /** * Are unix permissions - * @{value} + * {@value} */ String SUPPORTS_UNIX_PERMISSIONS = "supports-unix-permissions"; + /** + * Is positioned readable supported? Supporting seek should be sufficient + * for this. + * {@value} + */ + String SUPPORTS_POSITIONED_READABLE = "supports-positioned-readable"; + /** * Maximum path length - * @{value} + * {@value} */ String MAX_PATH_ = "max-path"; /** * Maximum filesize: 0 or -1 for no limit - * @{value} + * {@value} */ String MAX_FILESIZE = "max-filesize"; /** * Flag to indicate that tests on the root directories of a filesystem/ * object store are permitted - * @{value} + * {@value} */ String TEST_ROOT_TESTS_ENABLED = "test.root-tests-enabled"; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 3f16724ec26..6343d40ee8f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.junit.internal.AssumptionViolatedException; import org.slf4j.Logger; @@ -432,9 +433,7 @@ public static void skip(String message) { * @throws AssertionError with the text and throwable -always */ public static void fail(String text, Throwable thrown) { - AssertionError e = new AssertionError(text); - e.initCause(thrown); - throw e; + throw new AssertionError(text, thrown); } /** @@ -509,10 +508,14 @@ public static void createFile(FileSystem fs, boolean overwrite, byte[] data) throws IOException { FSDataOutputStream stream = fs.create(path, overwrite); - if (data != null && data.length > 0) { - stream.write(data); + try { + if (data != null && data.length > 0) { + stream.write(data); + } + stream.close(); + } finally { + IOUtils.closeStream(stream); } - stream.close(); } /** @@ -574,13 +577,10 @@ public static void assertDeleted(FileSystem fs, public static String readBytesToString(FileSystem fs, Path path, int length) throws IOException { - FSDataInputStream in = fs.open(path); - try { + try (FSDataInputStream in = fs.open(path)) { byte[] buf = new byte[length]; in.readFully(0, buf); return toChar(buf); - } finally { - in.close(); } } @@ -786,8 +786,7 @@ public static void verifyReceivedData(FileSystem fs, Path path, long totalBytesRead = 0; int nextExpectedNumber = 0; - final InputStream inputStream = fs.open(path); - try { + try (InputStream inputStream = fs.open(path)) { while (true) { final int bytesRead = inputStream.read(testBuffer); if (bytesRead < 0) { @@ -814,8 +813,6 @@ public static void verifyReceivedData(FileSystem fs, Path path, throw new IOException("Expected to read " + expectedSize + " bytes but only received " + totalBytesRead); } - } finally { - inputStream.close(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml index b2e068c41e3..b261a63be7d 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/contract/localfs.xml @@ -100,7 +100,7 @@ case sensitivity and permission options are determined at run time from OS type true - + fs.contract.rejects-seek-past-eof true @@ -121,4 +121,4 @@ case sensitivity and permission options are determined at run time from OS type true - \ No newline at end of file + diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 040055e57c6..50a6a43fc04 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1473,6 +1473,10 @@ protected static boolean tokenRefetchNeeded(IOException ex, @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } try (TraceScope scope = dfsClient. newReaderTraceScope("DFSInputStream#byteArrayPread", src, position, length)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java index ccdfad4fcec..aa88bb78e6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java @@ -28,6 +28,7 @@ import java.util.StringTokenizer; import org.apache.commons.io.input.BoundedInputStream; +import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.http.HttpStatus; @@ -129,6 +130,9 @@ protected InputStream getInputStream() throws IOException { @VisibleForTesting protected InputStreamAndFileLength openInputStream(long startOffset) throws IOException { + if (startOffset < 0) { + throw new EOFException("Negative Position"); + } // Use the original url if no resolved url exists, eg. if // it's the first time a request is made. final boolean resolved = resolvedURL.getURL() != null; @@ -250,6 +254,10 @@ public void seek(long pos) throws IOException { @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return 0; + } try (InputStream in = openInputStream(position).in) { return in.read(buffer, offset, length); } @@ -258,17 +266,21 @@ public int read(long position, byte[] buffer, int offset, int length) @Override public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { - final InputStreamAndFileLength fin = openInputStream(position); - if (fin.length != null && length + position > fin.length) { - throw new EOFException("The length to read " + length - + " exceeds the file length " + fin.length); + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return; } + final InputStreamAndFileLength fin = openInputStream(position); try { + if (fin.length != null && length + position > fin.length) { + throw new EOFException("The length to read " + length + + " exceeds the file length " + fin.length); + } int nread = 0; while (nread < length) { int nbytes = fin.in.read(buffer, offset + nread, length - nread); if (nbytes < 0) { - throw new EOFException("End of file reached before reading fully."); + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); } nread += nbytes; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 685026e305c..750358d9574 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -137,6 +137,8 @@ public synchronized int read() throws IOException { LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); reopen(pos); byteRead = wrappedStream.read(); + } catch (EOFException e) { + return -1; } if (byteRead >= 0) { @@ -216,4 +218,42 @@ public synchronized int available() throws IOException { public boolean markSupported() { return false; } + + /** + * Subclass {@code readFully()} operation which only seeks at the start + * of the series of operations; seeking back at the end. + * + * This is significantly higher performance if multiple read attempts are + * needed to fetch the data, as it does not break the HTTP connection. + * + * To maintain thread safety requirements, this operation is synchronized + * for the duration of the sequence. + * {@inheritDoc} + * + */ + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + validatePositionedReadArgs(position, buffer, offset, length); + if (length == 0) { + return; + } + int nread = 0; + synchronized (this) { + long oldPos = getPos(); + try { + seek(position); + while (nread < length) { + int nbytes = read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY); + } + nread += nbytes; + } + + } finally { + seek(oldPos); + } + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 855774aecef..a8321436189 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -725,6 +725,8 @@ public synchronized int read() throws FileNotFoundException, IOException { // Return to the caller with the result. // return result; + } catch(EOFException e) { + return -1; } catch(IOException e) { Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); @@ -773,7 +775,7 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce pos += result; } - if (null != statistics) { + if (null != statistics && result > 0) { statistics.incrementBytesRead(result); }