HADOOP-12994. Specify PositionedReadable, add contract tests, fix problems. Contributed by Steve Loughran.

(cherry picked from commit 843ee8d59d)
This commit is contained in:
Chris Nauroth 2016-04-08 13:36:58 -07:00
parent 77a75de319
commit 26a23eff82
28 changed files with 575 additions and 174 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.crypto; package org.apache.hadoop.crypto;
import java.io.EOFException;
import java.io.FileDescriptor; import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FilterInputStream; import java.io.FilterInputStream;
@ -34,6 +35,7 @@
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.PositionedReadable;
@ -395,7 +397,9 @@ public void readFully(long position, byte[] buffer) throws IOException {
/** Seek to a position. */ /** Seek to a position. */
@Override @Override
public void seek(long pos) throws IOException { public void seek(long pos) throws IOException {
Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset."); if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
checkStream(); checkStream();
try { try {
/* /*

View File

@ -182,20 +182,18 @@ public int available() throws IOException {
public int read(long position, byte[] b, int off, int len) public int read(long position, byte[] b, int off, int len)
throws IOException { throws IOException {
// parameter check // parameter check
if ((off | len | (off + len) | (b.length - (off + len))) < 0) { validatePositionedReadArgs(position, b, off, len);
throw new IndexOutOfBoundsException(); if (len == 0) {
} else if (len == 0) {
return 0; return 0;
} }
if( position<0 ) {
throw new IllegalArgumentException(
"Parameter position can not to be negative");
}
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); int nread;
checker.seek(position); try (ChecksumFSInputChecker checker =
int nread = checker.read(b, off, len); new ChecksumFSInputChecker(fs, file)) {
checker.close(); checker.seek(position);
nread = checker.read(b, off, len);
checker.close();
}
return nread; return nread;
} }

View File

@ -169,20 +169,18 @@ public int available() throws IOException {
public int read(long position, byte[] b, int off, int len) public int read(long position, byte[] b, int off, int len)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
// parameter check // parameter check
if ((off | len | (off + len) | (b.length - (off + len))) < 0) { validatePositionedReadArgs(position, b, off, len);
throw new IndexOutOfBoundsException(); if (len == 0) {
} else if (len == 0) {
return 0; return 0;
} }
if (position<0) {
throw new IllegalArgumentException(
"Parameter position can not to be negative");
}
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file); int nread;
checker.seek(position); try (ChecksumFSInputChecker checker =
int nread = checker.read(b, off, len); new ChecksumFSInputChecker(fs, file)) {
checker.close(); checker.seek(position);
nread = checker.read(b, off, len);
checker.close();
}
return nread; return nread;
} }

View File

@ -18,18 +18,21 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.*; import java.io.DataInputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.IdentityHashStore;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */ * and buffers input through a {@link java.io.BufferedInputStream}. */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream public class FSDataInputStream extends DataInputStream
@ -97,6 +100,7 @@ public int read(long position, byte[] buffer, int offset, int length)
* @param buffer buffer into which data is read * @param buffer buffer into which data is read
* @param offset offset into the buffer in which data is written * @param offset offset into the buffer in which data is written
* @param length the number of bytes to read * @param length the number of bytes to read
* @throws IOException IO problems
* @throws EOFException If the end of stream is reached while reading. * @throws EOFException If the end of stream is reached while reading.
* If an exception is thrown an undetermined number * If an exception is thrown an undetermined number
* of bytes in the buffer may have been written. * of bytes in the buffer may have been written.

View File

@ -40,4 +40,10 @@ public class FSExceptionMessages {
*/ */
public static final String CANNOT_SEEK_PAST_EOF = public static final String CANNOT_SEEK_PAST_EOF =
"Attempted to seek or read past the end of the file"; "Attempted to seek or read past the end of the file";
public static final String EOF_IN_READ_FULLY =
"End of file reached before reading fully.";
public static final String TOO_MANY_BYTES_FOR_DEST_BUFFER
= "Requested more bytes than destination buffer size";
} }

View File

@ -17,22 +17,28 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.*; import java.io.EOFException;
import java.nio.ByteBuffer; import java.io.IOException;
import java.io.InputStream;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ZeroCopyUnavailableException; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**************************************************************** /****************************************************************
* FSInputStream is a generic old InputStream with a little bit * FSInputStream is a generic old InputStream with a little bit
* of RAF-style seek ability. * of RAF-style seek ability.
* *
*****************************************************************/ *****************************************************************/
@InterfaceAudience.LimitedPrivate({"HDFS"}) @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Evolving
public abstract class FSInputStream extends InputStream public abstract class FSInputStream extends InputStream
implements Seekable, PositionedReadable { implements Seekable, PositionedReadable {
private static final Logger LOG =
LoggerFactory.getLogger(FSInputStream.class);
/** /**
* Seek to the given offset from the start of the file. * Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't * The next read() will be from that location. Can't
@ -57,12 +63,21 @@ public abstract class FSInputStream extends InputStream
@Override @Override
public int read(long position, byte[] buffer, int offset, int length) public int read(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
synchronized (this) { synchronized (this) {
long oldPos = getPos(); long oldPos = getPos();
int nread = -1; int nread = -1;
try { try {
seek(position); seek(position);
nread = read(buffer, offset, length); nread = read(buffer, offset, length);
} catch (EOFException e) {
// end of file; this can be raised by some filesystems
// (often: object stores); it is swallowed here.
LOG.debug("Downgrading EOFException raised trying to" +
" read {} bytes at offset {}", length, offset, e);
} finally { } finally {
seek(oldPos); seek(oldPos);
} }
@ -70,14 +85,42 @@ public int read(long position, byte[] buffer, int offset, int length)
} }
} }
/**
* Validation code, available for use in subclasses.
* @param position position: if negative an EOF exception is raised
* @param buffer destination buffer
* @param offset offset within the buffer
* @param length length of bytes to read
* @throws EOFException if the position is negative
* @throws IndexOutOfBoundsException if there isn't space for the amount of
* data requested.
* @throws IllegalArgumentException other arguments are invalid.
*/
protected void validatePositionedReadArgs(long position,
byte[] buffer, int offset, int length) throws EOFException {
Preconditions.checkArgument(length >= 0, "length is negative");
if (position < 0) {
throw new EOFException("position is negative");
}
Preconditions.checkArgument(buffer != null, "Null buffer");
if (buffer.length - offset < length) {
throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER);
}
}
@Override @Override
public void readFully(long position, byte[] buffer, int offset, int length) public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
int nread = 0; int nread = 0;
while (nread < length) { while (nread < length) {
int nbytes = read(position+nread, buffer, offset+nread, length-nread); int nbytes = read(position + nread,
buffer,
offset + nread,
length - nread);
if (nbytes < 0) { if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully."); throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
} }
nread += nbytes; nread += nbytes;
} }

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.util.LineReader; import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
@ -1053,17 +1054,16 @@ public int read(long pos, byte[] b, int offset, int length)
@Override @Override
public void readFully(long pos, byte[] b, int offset, int length) public void readFully(long pos, byte[] b, int offset, int length)
throws IOException { throws IOException {
validatePositionedReadArgs(pos, b, offset, length);
if (length == 0) {
return;
}
if (start + length + pos > end) { if (start + length + pos > end) {
throw new IOException("Not enough bytes to read."); throw new EOFException("Not enough bytes to read.");
} }
underLyingStream.readFully(pos + start, b, offset, length); underLyingStream.readFully(pos + start, b, offset, length);
} }
@Override
public void readFully(long pos, byte[] b) throws IOException {
readFully(pos, b, 0, b.length);
}
@Override @Override
public void setReadahead(Long readahead) throws IOException { public void setReadahead(Long readahead) throws IOException {
underLyingStream.setReadahead(readahead); underLyingStream.setReadahead(readahead);

View File

@ -22,30 +22,67 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
/** Stream that permits positional reading. */ /**
* Stream that permits positional reading.
*
* Implementations are required to implement thread-safe operations; this may
* be supported by concurrent access to the data, or by using a synchronization
* mechanism to serialize access.
*
* Not all implementations meet this requirement. Those that do not cannot
* be used as a backing store for some applications, such as Apache HBase.
*
* Independent of whether or not they are thread safe, some implementations
* may make the intermediate state of the system, specifically the position
* obtained in {@code Seekable.getPos()} visible.
*/
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface PositionedReadable { public interface PositionedReadable {
/** /**
* Read upto the specified number of bytes, from a given * Read up to the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not * position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe. * change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @return actual number of bytes read; -1 means "none"
* @throws IOException IO problems.
*/ */
public int read(long position, byte[] buffer, int offset, int length) int read(long position, byte[] buffer, int offset, int length)
throws IOException; throws IOException;
/** /**
* Read the specified number of bytes, from a given * Read the specified number of bytes, from a given
* position within a file. This does not * position within a file. This does not
* change the current offset of a file, and is thread-safe. * change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/ */
public void readFully(long position, byte[] buffer, int offset, int length) void readFully(long position, byte[] buffer, int offset, int length)
throws IOException; throws IOException;
/** /**
* Read number of bytes equal to the length of the buffer, from a given * Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not * position within a file. This does not
* change the current offset of a file, and is thread-safe. * change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/ */
public void readFully(long position, byte[] buffer) throws IOException; void readFully(long position, byte[] buffer) throws IOException;
} }

View File

@ -157,6 +157,8 @@ public int read() throws IOException {
@Override @Override
public int read(byte[] b, int off, int len) throws IOException { public int read(byte[] b, int off, int len) throws IOException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
try { try {
int value = fis.read(b, off, len); int value = fis.read(b, off, len);
if (value > 0) { if (value > 0) {
@ -172,6 +174,12 @@ public int read(byte[] b, int off, int len) throws IOException {
@Override @Override
public int read(long position, byte[] b, int off, int len) public int read(long position, byte[] b, int off, int len)
throws IOException { throws IOException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
ByteBuffer bb = ByteBuffer.wrap(b, off, len); ByteBuffer bb = ByteBuffer.wrap(b, off, len);
try { try {
int value = fis.getChannel().read(bb, position); int value = fis.getChannel().read(bb, position);

View File

@ -279,6 +279,9 @@ on the underlying stream:
read(dest3, ... len3) -> dest3[0..len3 - 1] = read(dest3, ... len3) -> dest3[0..len3 - 1] =
[data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1] [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
Note that implementations are not required to be atomic; the intermediate state
of the operation (the change in the value of `getPos()`) may be visible.
#### Implementation preconditions #### Implementation preconditions
Not all `FSDataInputStream` implementations support these operations. Those that do Not all `FSDataInputStream` implementations support these operations. Those that do
@ -287,7 +290,7 @@ interface.
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException] supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
This could be considered obvious: if a stream is not Seekable, a client This could be considered obvious: if a stream is not `Seekable`, a client
cannot seek to a location. It is also a side effect of the cannot seek to a location. It is also a side effect of the
base class implementation, which uses `Seekable.seek()`. base class implementation, which uses `Seekable.seek()`.
@ -304,14 +307,14 @@ For any operations that fail, the contents of the destination
`buffer` are undefined. Implementations may overwrite part `buffer` are undefined. Implementations may overwrite part
or all of the buffer before reporting a failure. or all of the buffer before reporting a failure.
### `int PositionedReadable.read(position, buffer, offset, length)` ### `int PositionedReadable.read(position, buffer, offset, length)`
Read as much data as possible into the buffer space allocated for it.
#### Preconditions #### Preconditions
position > 0 else raise [IllegalArgumentException, RuntimeException] position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) + offset < len(data) else raise [IndexOutOfBoundException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0 length >= 0
offset >= 0 offset >= 0
@ -324,23 +327,36 @@ of data available from the specified position:
buffer'[offset..(offset+available-1)] = data[position..position+available -1] buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available result = available
1. A return value of -1 means that the stream had no more available data.
1. An invocation with `length==0` implicitly does not read any data;
implementations may short-cut the operation and omit any IO. In such instances,
checks for the stream being at the end of the file may be omitted.
1. If an IO exception occurs during the read operation(s),
the final state of `buffer` is undefined.
### `void PositionedReadable.readFully(position, buffer, offset, length)` ### `void PositionedReadable.readFully(position, buffer, offset, length)`
Read exactly `length` bytes of data into the buffer, failing if there is not
enough data available.
#### Preconditions #### Preconditions
position > 0 else raise [IllegalArgumentException, RuntimeException] position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0 length >= 0
offset >= 0 offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException] (position + length) <= len(data) else raise [EOFException, IOException]
len(buffer) + offset < len(data)
If an IO exception occurs during the read operation(s),
the final state of `buffer` is undefined.
If there is not enough data in the input stream to satisfy the requests,
the final state of `buffer` is undefined.
#### Postconditions #### Postconditions
The amount of data read is the less of the length or the amount The buffer from offset `offset` is filled with the data starting at `position`
of data available from the specified position:
let available = min(length, len(data)-position)
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)] buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
### `PositionedReadable.readFully(position, buffer)` ### `PositionedReadable.readFully(position, buffer)`
@ -349,6 +365,9 @@ The semantics of this are exactly equivalent to
readFully(position, buffer, 0, len(buffer)) readFully(position, buffer, 0, len(buffer))
That is, the buffer is filled entirely with the contents of the input source
from position `position`
## Consistency ## Consistency

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.crypto; package org.apache.hadoop.crypto;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -29,6 +30,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
@ -339,7 +341,7 @@ private void readFullyCheck(InputStream in, int pos) throws Exception {
try { try {
((PositionedReadable) in).readFully(pos, result); ((PositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail."); Assert.fail("Read fully exceeds maximum length should fail.");
} catch (IOException e) { } catch (EOFException e) {
} }
} }
@ -365,9 +367,9 @@ public void testSeek() throws Exception {
try { try {
seekCheck(in, -3); seekCheck(in, -3);
Assert.fail("Seek to negative offset should fail."); Assert.fail("Seek to negative offset should fail.");
} catch (IllegalArgumentException e) { } catch (EOFException e) {
GenericTestUtils.assertExceptionContains("Cannot seek to negative " + GenericTestUtils.assertExceptionContains(
"offset", e); FSExceptionMessages.NEGATIVE_SEEK, e);
} }
Assert.assertEquals(pos, ((Seekable) in).getPos()); Assert.assertEquals(pos, ((Seekable) in).getPos());

View File

@ -52,11 +52,8 @@ public void setup() throws Exception {
public void testAppendToEmptyFile() throws Throwable { public void testAppendToEmptyFile() throws Throwable {
touch(getFileSystem(), target); touch(getFileSystem(), target);
byte[] dataset = dataset(256, 'a', 'z'); byte[] dataset = dataset(256, 'a', 'z');
FSDataOutputStream outputStream = getFileSystem().append(target); try (FSDataOutputStream outputStream = getFileSystem().append(target)) {
try {
outputStream.write(dataset); outputStream.write(dataset);
} finally {
outputStream.close();
} }
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target, byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
dataset.length); dataset.length);

View File

@ -53,7 +53,7 @@ public void setup() throws Exception {
target = new Path(testPath, "target"); target = new Path(testPath, "target");
byte[] block = dataset(TEST_FILE_LEN, 0, 255); byte[] block = dataset(TEST_FILE_LEN, 0, 255);
createFile(getFileSystem(), srcFile, false, block); createFile(getFileSystem(), srcFile, true, block);
touch(getFileSystem(), zeroByteFile); touch(getFileSystem(), zeroByteFile);
} }

View File

@ -123,7 +123,7 @@ public void testOverwriteNonEmptyDirectory() throws Throwable {
} catch (AssertionError failure) { } catch (AssertionError failure) {
if (isSupported(IS_BLOBSTORE)) { if (isSupported(IS_BLOBSTORE)) {
// file/directory hack surfaces here // file/directory hack surfaces here
throw new AssumptionViolatedException(failure.toString()).initCause(failure); throw new AssumptionViolatedException(failure.toString(), failure);
} }
// else: rethrow // else: rethrow
throw failure; throw failure;
@ -163,13 +163,11 @@ public void testOverwriteNonEmptyDirectory() throws Throwable {
public void testCreatedFileIsImmediatelyVisible() throws Throwable { public void testCreatedFileIsImmediatelyVisible() throws Throwable {
describe("verify that a newly created file exists as soon as open returns"); describe("verify that a newly created file exists as soon as open returns");
Path path = path("testCreatedFileIsImmediatelyVisible"); Path path = path("testCreatedFileIsImmediatelyVisible");
FSDataOutputStream out = null; try(FSDataOutputStream out = getFileSystem().create(path,
try {
out = getFileSystem().create(path,
false, false,
4096, 4096,
(short) 1, (short) 1,
1024); 1024)) {
if (!getFileSystem().exists(path)) { if (!getFileSystem().exists(path)) {
if (isSupported(IS_BLOBSTORE)) { if (isSupported(IS_BLOBSTORE)) {
@ -180,8 +178,6 @@ public void testCreatedFileIsImmediatelyVisible() throws Throwable {
assertPathExists("expected path to be visible before anything written", assertPathExists("expected path to be visible before anything written",
path); path);
} }
} finally {
IOUtils.closeStream(out);
} }
} }
} }

View File

@ -47,7 +47,7 @@ public void testDeleteEmptyDirRecursive() throws Throwable {
@Test @Test
public void testDeleteNonexistentPathRecursive() throws Throwable { public void testDeleteNonexistentPathRecursive() throws Throwable {
Path path = path("testDeleteNonexistentPathRecursive"); Path path = path("testDeleteNonexistentPathRecursive");
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path); assertPathDoesNotExist("leftover", path);
ContractTestUtils.rejectRootOperation(path); ContractTestUtils.rejectRootOperation(path);
assertFalse("Returned true attempting to delete" assertFalse("Returned true attempting to delete"
+ " a nonexistent path " + path, + " a nonexistent path " + path,
@ -58,7 +58,7 @@ public void testDeleteNonexistentPathRecursive() throws Throwable {
@Test @Test
public void testDeleteNonexistentPathNonRecursive() throws Throwable { public void testDeleteNonexistentPathNonRecursive() throws Throwable {
Path path = path("testDeleteNonexistentPathNonRecursive"); Path path = path("testDeleteNonexistentPathNonRecursive");
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path); assertPathDoesNotExist("leftover", path);
ContractTestUtils.rejectRootOperation(path); ContractTestUtils.rejectRootOperation(path);
assertFalse("Returned true attempting to recursively delete" assertFalse("Returned true attempting to recursively delete"
+ " a nonexistent path " + path, + " a nonexistent path " + path,
@ -81,7 +81,7 @@ public void testDeleteNonEmptyDirNonRecursive() throws Throwable {
//expected //expected
handleExpectedException(expected); handleExpectedException(expected);
} }
ContractTestUtils.assertIsDirectory(getFileSystem(), path); assertIsDirectory(path);
} }
@Test @Test
@ -92,7 +92,7 @@ public void testDeleteNonEmptyDirRecursive() throws Throwable {
ContractTestUtils.writeTextFile(getFileSystem(), file, "goodbye, world", ContractTestUtils.writeTextFile(getFileSystem(), file, "goodbye, world",
true); true);
assertDeleted(path, true); assertDeleted(path, true);
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file); assertPathDoesNotExist("not deleted", file);
} }
@Test @Test
@ -100,12 +100,11 @@ public void testDeleteDeepEmptyDir() throws Throwable {
mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4")); mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true); assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true);
FileSystem fs = getFileSystem(); assertPathDoesNotExist(
ContractTestUtils.assertPathDoesNotExist(fs,
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4")); "not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
ContractTestUtils.assertPathDoesNotExist(fs, assertPathDoesNotExist(
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3")); "not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3"));
ContractTestUtils.assertPathExists(fs, "parent dir is deleted", assertPathExists( "parent dir is deleted",
path("testDeleteDeepEmptyDir/d1/d2")); path("testDeleteDeepEmptyDir/d1/d2"));
} }
@ -117,8 +116,7 @@ public void testDeleteSingleFile() throws Throwable {
Path file = new Path(path, "childfile"); Path file = new Path(path, "childfile");
ContractTestUtils.writeTextFile(getFileSystem(), file, ContractTestUtils.writeTextFile(getFileSystem(), file,
"single file to be deleted.", true); "single file to be deleted.", true);
ContractTestUtils.assertPathExists(getFileSystem(), assertPathExists("single file not created", file);
"single file not created", file);
assertDeleted(file, false); assertDeleted(file, false);
} }
} }

View File

@ -67,12 +67,9 @@ public void testNoMkdirOverFile() throws Throwable {
boolean made = fs.mkdirs(path); boolean made = fs.mkdirs(path);
fail("mkdirs did not fail over a file but returned " + made fail("mkdirs did not fail over a file but returned " + made
+ "; " + ls(path)); + "; " + ls(path));
} catch (ParentNotDirectoryException e) { } catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
//parent is a directory //parent is a directory
handleExpectedException(e); handleExpectedException(e);
} catch (FileAlreadyExistsException e) {
//also allowed as an exception (HDFS)
handleExpectedException(e);;
} catch (IOException e) { } catch (IOException e) {
//here the FS says "no create" //here the FS says "no create"
handleRelaxedException("mkdirs", "FileAlreadyExistsException", e); handleRelaxedException("mkdirs", "FileAlreadyExistsException", e);
@ -97,11 +94,9 @@ public void testMkdirOverParentFile() throws Throwable {
boolean made = fs.mkdirs(child); boolean made = fs.mkdirs(child);
fail("mkdirs did not fail over a file but returned " + made fail("mkdirs did not fail over a file but returned " + made
+ "; " + ls(path)); + "; " + ls(path));
} catch (ParentNotDirectoryException e) { } catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
//parent is a directory //parent is a directory
handleExpectedException(e); handleExpectedException(e);
} catch (FileAlreadyExistsException e) {
handleExpectedException(e);
} catch (IOException e) { } catch (IOException e) {
handleRelaxedException("mkdirs", "ParentNotDirectoryException", e); handleRelaxedException("mkdirs", "ParentNotDirectoryException", e);
} }

View File

@ -125,10 +125,10 @@ public void testOpenFileTwice() throws Throwable {
createFile(getFileSystem(), path, false, block); createFile(getFileSystem(), path, false, block);
//open first //open first
FSDataInputStream instream1 = getFileSystem().open(path); FSDataInputStream instream1 = getFileSystem().open(path);
int c = instream1.read();
assertEquals(0,c);
FSDataInputStream instream2 = null; FSDataInputStream instream2 = null;
try { try {
int c = instream1.read();
assertEquals(0,c);
instream2 = getFileSystem().open(path); instream2 = getFileSystem().open(path);
assertEquals("first read of instream 2", 0, instream2.read()); assertEquals("first read of instream 2", 0, instream2.read());
assertEquals("second read of instream 1", 1, instream1.read()); assertEquals("second read of instream 1", 1, instream1.read());

View File

@ -26,8 +26,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
/** /**
* Test creating files, overwrite options &c * Test creating files, overwrite options &c
@ -46,9 +45,9 @@ public void testRenameNewFileSameDir() throws Throwable {
boolean rename = rename(renameSrc, renameTarget); boolean rename = rename(renameSrc, renameTarget);
assertTrue("rename("+renameSrc+", "+ renameTarget+") returned false", assertTrue("rename("+renameSrc+", "+ renameTarget+") returned false",
rename); rename);
ContractTestUtils.assertListStatusFinds(getFileSystem(), assertListStatusFinds(getFileSystem(),
renameTarget.getParent(), renameTarget); renameTarget.getParent(), renameTarget);
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data); verifyFileContents(getFileSystem(), renameTarget, data);
} }
@Test @Test
@ -129,7 +128,7 @@ public void testRenameFileOverExistingFile() throws Throwable {
} }
// verify that the destination file is as expected based on the expected // verify that the destination file is as expected based on the expected
// outcome // outcome
ContractTestUtils.verifyFileContents(getFileSystem(), destFile, verifyFileContents(getFileSystem(), destFile,
destUnchanged? destData: srcData); destUnchanged? destData: srcData);
} }
@ -154,7 +153,7 @@ public void testRenameDirIntoExistingDir() throws Throwable {
Path renamedSrc = new Path(destDir, sourceSubdir); Path renamedSrc = new Path(destDir, sourceSubdir);
assertIsFile(destFilePath); assertIsFile(destFilePath);
assertIsDirectory(renamedSrc); assertIsDirectory(renamedSrc);
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset); verifyFileContents(fs, destFilePath, destDateset);
assertTrue("rename returned false though the contents were copied", rename); assertTrue("rename returned false though the contents were copied", rename);
} }
@ -172,10 +171,10 @@ public void testRenameFileNonexistentDir() throws Throwable {
boolean rename = rename(renameSrc, renameTarget); boolean rename = rename(renameSrc, renameTarget);
if (renameCreatesDestDirs) { if (renameCreatesDestDirs) {
assertTrue(rename); assertTrue(rename);
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data); verifyFileContents(getFileSystem(), renameTarget, data);
} else { } else {
assertFalse(rename); assertFalse(rename);
ContractTestUtils.verifyFileContents(getFileSystem(), renameSrc, data); verifyFileContents(getFileSystem(), renameSrc, data);
} }
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// allowed unless that rename flag is set // allowed unless that rename flag is set
@ -191,36 +190,36 @@ public void testRenameWithNonEmptySubDir() throws Throwable {
final Path finalDir = new Path(renameTestDir, "dest"); final Path finalDir = new Path(renameTestDir, "dest");
FileSystem fs = getFileSystem(); FileSystem fs = getFileSystem();
boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR); boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR);
ContractTestUtils.rm(fs, renameTestDir, true, false); rm(fs, renameTestDir, true, false);
fs.mkdirs(srcDir); fs.mkdirs(srcDir);
fs.mkdirs(finalDir); fs.mkdirs(finalDir);
ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"), writeTextFile(fs, new Path(srcDir, "source.txt"),
"this is the file in src dir", false); "this is the file in src dir", false);
ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"), writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
"this is the file in src/sub dir", false); "this is the file in src/sub dir", false);
ContractTestUtils.assertPathExists(fs, "not created in src dir", assertPathExists("not created in src dir",
new Path(srcDir, "source.txt")); new Path(srcDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not created in src/sub dir", assertPathExists("not created in src/sub dir",
new Path(srcSubDir, "subfile.txt")); new Path(srcSubDir, "subfile.txt"));
fs.rename(srcDir, finalDir); fs.rename(srcDir, finalDir);
// Accept both POSIX rename behavior and CLI rename behavior // Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) { if (renameRemoveEmptyDest) {
// POSIX rename behavior // POSIX rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir", assertPathExists("not renamed into dest dir",
new Path(finalDir, "source.txt")); new Path(finalDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir", assertPathExists("not renamed into dest/sub dir",
new Path(finalDir, "sub/subfile.txt")); new Path(finalDir, "sub/subfile.txt"));
} else { } else {
// CLI rename behavior // CLI rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir", assertPathExists("not renamed into dest dir",
new Path(finalDir, "src1/source.txt")); new Path(finalDir, "src1/source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir", assertPathExists("not renamed into dest/sub dir",
new Path(finalDir, "src1/sub/subfile.txt")); new Path(finalDir, "src1/sub/subfile.txt"));
} }
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted", assertPathDoesNotExist("not deleted",
new Path(srcDir, "source.txt")); new Path(srcDir, "source.txt"));
} }
} }

View File

@ -51,7 +51,7 @@ public void testMkDirDepth1() throws Throwable {
Path dir = new Path("/testmkdirdepth1"); Path dir = new Path("/testmkdirdepth1");
assertPathDoesNotExist("directory already exists", dir); assertPathDoesNotExist("directory already exists", dir);
fs.mkdirs(dir); fs.mkdirs(dir);
ContractTestUtils.assertIsDirectory(getFileSystem(), dir); assertIsDirectory(dir);
assertPathExists("directory already exists", dir); assertPathExists("directory already exists", dir);
assertDeleted(dir, true); assertDeleted(dir, true);
} }
@ -61,10 +61,10 @@ public void testRmEmptyRootDirNonRecursive() throws Throwable {
//extra sanity checks here to avoid support calls about complete loss of data //extra sanity checks here to avoid support calls about complete loss of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/"); Path root = new Path("/");
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
boolean deleted = getFileSystem().delete(root, true); boolean deleted = getFileSystem().delete(root, true);
LOG.info("rm / of empty dir result is {}", deleted); LOG.info("rm / of empty dir result is {}", deleted);
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
} }
@Test @Test
@ -75,7 +75,7 @@ public void testRmNonEmptyRootDirNonRecursive() throws Throwable {
String touchfile = "/testRmNonEmptyRootDirNonRecursive"; String touchfile = "/testRmNonEmptyRootDirNonRecursive";
Path file = new Path(touchfile); Path file = new Path(touchfile);
ContractTestUtils.touch(getFileSystem(), file); ContractTestUtils.touch(getFileSystem(), file);
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
try { try {
boolean deleted = getFileSystem().delete(root, false); boolean deleted = getFileSystem().delete(root, false);
fail("non recursive delete should have raised an exception," + fail("non recursive delete should have raised an exception," +
@ -86,7 +86,7 @@ public void testRmNonEmptyRootDirNonRecursive() throws Throwable {
} finally { } finally {
getFileSystem().delete(file, false); getFileSystem().delete(file, false);
} }
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
} }
@Test @Test
@ -94,11 +94,11 @@ public void testRmRootRecursive() throws Throwable {
//extra sanity checks here to avoid support calls about complete loss of data //extra sanity checks here to avoid support calls about complete loss of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/"); Path root = new Path("/");
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
Path file = new Path("/testRmRootRecursive"); Path file = new Path("/testRmRootRecursive");
ContractTestUtils.touch(getFileSystem(), file); ContractTestUtils.touch(getFileSystem(), file);
boolean deleted = getFileSystem().delete(root, true); boolean deleted = getFileSystem().delete(root, true);
ContractTestUtils.assertIsDirectory(getFileSystem(), root); assertIsDirectory(root);
LOG.info("rm -rf / result is {}", deleted); LOG.info("rm -rf / result is {}", deleted);
if (deleted) { if (deleted) {
assertPathDoesNotExist("expected file to be deleted", file); assertPathDoesNotExist("expected file to be deleted", file);

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.Test; import org.junit.Test;
@ -31,9 +32,9 @@
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import static org.apache.hadoop.fs.contract.ContractTestUtils.cleanup;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
@ -46,7 +47,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
public static final int DEFAULT_RANDOM_SEEK_COUNT = 100; public static final int DEFAULT_RANDOM_SEEK_COUNT = 100;
private Path testPath;
private Path smallSeekFile; private Path smallSeekFile;
private Path zeroByteFile; private Path zeroByteFile;
private FSDataInputStream instream; private FSDataInputStream instream;
@ -56,13 +56,13 @@ public void setup() throws Exception {
super.setup(); super.setup();
skipIfUnsupported(SUPPORTS_SEEK); skipIfUnsupported(SUPPORTS_SEEK);
//delete the test directory //delete the test directory
testPath = getContract().getTestPath();
smallSeekFile = path("seekfile.txt"); smallSeekFile = path("seekfile.txt");
zeroByteFile = path("zero.txt"); zeroByteFile = path("zero.txt");
byte[] block = dataset(TEST_FILE_LEN, 0, 255); byte[] block = dataset(TEST_FILE_LEN, 0, 255);
//this file now has a simple rule: offset => value //this file now has a simple rule: offset => value
createFile(getFileSystem(), smallSeekFile, false, block); FileSystem fs = getFileSystem();
touch(getFileSystem(), zeroByteFile); createFile(fs, smallSeekFile, true, block);
touch(fs, zeroByteFile);
} }
@Override @Override
@ -79,6 +79,21 @@ public void teardown() throws Exception {
super.teardown(); super.teardown();
} }
/**
* Skip a test case if the FS doesn't support positioned readable.
* This should hold automatically if the FS supports seek, even
* if it doesn't support seeking past the EOF.
* And, because this test suite requires seek to be supported, the
* feature is automatically assumed to be true unless stated otherwise.
*/
protected void assumeSupportsPositionedReadable() throws IOException {
// because this ,
if (!getContract().isSupported(SUPPORTS_POSITIONED_READABLE, true)) {
skip("Skipping as unsupported feature: "
+ SUPPORTS_POSITIONED_READABLE);
}
}
@Test @Test
public void testSeekZeroByteFile() throws Throwable { public void testSeekZeroByteFile() throws Throwable {
describe("seek and read a 0 byte file"); describe("seek and read a 0 byte file");
@ -282,6 +297,7 @@ public void testSeekBigFile() throws Throwable {
public void testPositionedBulkReadDoesntChangePosition() throws Throwable { public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
describe( describe(
"verify that a positioned read does not change the getPos() value"); "verify that a positioned read does not change the getPos() value");
assumeSupportsPositionedReadable();
Path testSeekFile = path("bigseekfile.txt"); Path testSeekFile = path("bigseekfile.txt");
byte[] block = dataset(65536, 0, 255); byte[] block = dataset(65536, 0, 255);
createFile(getFileSystem(), testSeekFile, false, block); createFile(getFileSystem(), testSeekFile, false, block);
@ -290,8 +306,9 @@ public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
assertTrue(-1 != instream.read()); assertTrue(-1 != instream.read());
assertEquals(40000, instream.getPos()); assertEquals(40000, instream.getPos());
byte[] readBuffer = new byte[256]; int v = 256;
instream.read(128, readBuffer, 0, readBuffer.length); byte[] readBuffer = new byte[v];
assertEquals(v, instream.read(128, readBuffer, 0, v));
//have gone back //have gone back
assertEquals(40000, instream.getPos()); assertEquals(40000, instream.getPos());
//content is the same too //content is the same too
@ -317,12 +334,11 @@ public void testRandomSeeks() throws Throwable {
Path randomSeekFile = path("testrandomseeks.bin"); Path randomSeekFile = path("testrandomseeks.bin");
createFile(getFileSystem(), randomSeekFile, false, buf); createFile(getFileSystem(), randomSeekFile, false, buf);
Random r = new Random(); Random r = new Random();
FSDataInputStream stm = getFileSystem().open(randomSeekFile);
// Record the sequence of seeks and reads which trigger a failure. // Record the sequence of seeks and reads which trigger a failure.
int[] seeks = new int[10]; int[] seeks = new int[10];
int[] reads = new int[10]; int[] reads = new int[10];
try { try (FSDataInputStream stm = getFileSystem().open(randomSeekFile)) {
for (int i = 0; i < limit; i++) { for (int i = 0; i < limit; i++) {
int seekOff = r.nextInt(buf.length); int seekOff = r.nextInt(buf.length);
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000)); int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
@ -336,13 +352,232 @@ public void testRandomSeeks() throws Throwable {
sb.append("Sequence of actions:\n"); sb.append("Sequence of actions:\n");
for (int j = 0; j < seeks.length; j++) { for (int j = 0; j < seeks.length; j++) {
sb.append("seek @ ").append(seeks[j]).append(" ") sb.append("seek @ ").append(seeks[j]).append(" ")
.append("read ").append(reads[j]).append("\n"); .append("read ").append(reads[j]).append("\n");
} }
LOG.error(sb.toString()); LOG.error(sb.toString());
throw afe; throw afe;
} finally {
stm.close();
} }
} }
@Test
public void testReadFullyZeroByteFile() throws Throwable {
describe("readFully against a 0 byte file");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(zeroByteFile);
assertEquals(0, instream.getPos());
byte[] buffer = new byte[1];
instream.readFully(0, buffer, 0, 0);
assertEquals(0, instream.getPos());
// seek to 0 read 0 bytes from it
instream.seek(0);
assertEquals(0, instream.read(buffer, 0, 0));
}
@Test
public void testReadFullyPastEOFZeroByteFile() throws Throwable {
assumeSupportsPositionedReadable();
describe("readFully past the EOF of a 0 byte file");
instream = getFileSystem().open(zeroByteFile);
byte[] buffer = new byte[1];
// try to read past end of file
try {
instream.readFully(0, buffer, 0, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException
| EOFException e) {
// expected
}
}
@Test
public void testReadFullySmallFile() throws Throwable {
describe("readFully operations");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
// expect negative length to fail
try {
instream.readFully(0, buffer, 0, -16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative offset into buffer
try {
instream.readFully(0, buffer, -1, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// expect negative position to fail, ideally with EOF
try {
instream.readFully(-1, buffer);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException |IllegalArgumentException | IndexOutOfBoundsException e) {
handleRelaxedException("readFully with a negative position ",
"EOFException",
e);
}
// read more than the offset allows
try {
instream.readFully(0, buffer, buffer.length - 8, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// read properly
assertEquals(0, instream.getPos());
instream.readFully(0, buffer);
assertEquals(0, instream.getPos());
// now read the entire file in one go
byte[] fullFile = new byte[TEST_FILE_LEN];
instream.readFully(0, fullFile);
assertEquals(0, instream.getPos());
try {
instream.readFully(16, fullFile);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully which reads past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadFullyPastEOF() throws Throwable {
describe("readFully past the EOF of a file");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
// now read past the end of the file
try {
instream.readFully(TEST_FILE_LEN + 1, buffer);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully with an offset past EOF ",
"EOFException",
e);
}
// read zero bytes from an offset past EOF.
try {
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
// a zero byte read may fail-fast
LOG.info("Filesystem short-circuits 0-byte reads");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadFullyZeroBytebufferPastEOF() throws Throwable {
describe("readFully zero bytes from an offset past EOF");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
try {
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
// a zero byte read may fail-fast
LOG.info("Filesystem short-circuits 0-byte reads");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadNullBuffer() throws Throwable {
describe("try to read a null buffer ");
assumeSupportsPositionedReadable();
try (FSDataInputStream in = getFileSystem().open(smallSeekFile)) {
// Null buffer
int r = in.read(0, null, 0, 16);
fail("Expected an exception from a read into a null buffer, got " + r);
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testReadSmallFile() throws Throwable {
describe("PositionedRead.read operations");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
int r;
// expect negative length to fail
try {
r = instream.read(0, buffer, 0, -16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative offset into buffer
try {
r = instream.read(0, buffer, -1, 16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative position
try {
r = instream.read(-1, buffer, 0, 16);
fail("Expected an exception, got " + r);
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
handleRelaxedException("read() with a negative position ",
"EOFException",
e);
}
// read more than the offset allows
try {
r = instream.read(0, buffer, buffer.length - 8, 16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// read properly
assertEquals(0, instream.getPos());
instream.readFully(0, buffer);
assertEquals(0, instream.getPos());
// now read the entire file in one go
byte[] fullFile = new byte[TEST_FILE_LEN];
assertEquals(TEST_FILE_LEN,
instream.read(0, fullFile, 0, fullFile.length));
assertEquals(0, instream.getPos());
// now read past the end of the file
assertEquals(-1,
instream.read(TEST_FILE_LEN + 16, buffer, 0, 1));
}
@Test
public void testReadAtExactEOF() throws Throwable {
describe("read at the end of the file");
instream = getFileSystem().open(smallSeekFile);
instream.seek(TEST_FILE_LEN -1);
assertTrue("read at last byte", instream.read() > 0);
assertEquals("read just past EOF", -1, instream.read());
}
} }

View File

@ -57,7 +57,7 @@ public abstract class AbstractFSContractTestBase extends Assert
public static final int DEFAULT_TEST_TIMEOUT = 180 * 1000; public static final int DEFAULT_TEST_TIMEOUT = 180 * 1000;
/** /**
* The FS contract used for these tets * The FS contract used for these tests
*/ */
private AbstractFSContract contract; private AbstractFSContract contract;

View File

@ -53,20 +53,20 @@ public interface ContractOptions {
/** /**
* Flag to indicate that the FS can rename into directories that * Flag to indicate that the FS can rename into directories that
* don't exist, creating them as needed. * don't exist, creating them as needed.
* @{value} * {@value}
*/ */
String RENAME_CREATES_DEST_DIRS = "rename-creates-dest-dirs"; String RENAME_CREATES_DEST_DIRS = "rename-creates-dest-dirs";
/** /**
* Flag to indicate that the FS does not follow the rename contract -and * Flag to indicate that the FS does not follow the rename contract -and
* instead only returns false on a failure. * instead only returns false on a failure.
* @{value} * {@value}
*/ */
String RENAME_OVERWRITES_DEST = "rename-overwrites-dest"; String RENAME_OVERWRITES_DEST = "rename-overwrites-dest";
/** /**
* Flag to indicate that the FS returns false if the destination exists * Flag to indicate that the FS returns false if the destination exists
* @{value} * {@value}
*/ */
String RENAME_RETURNS_FALSE_IF_DEST_EXISTS = String RENAME_RETURNS_FALSE_IF_DEST_EXISTS =
"rename-returns-false-if-dest-exists"; "rename-returns-false-if-dest-exists";
@ -74,7 +74,7 @@ public interface ContractOptions {
/** /**
* Flag to indicate that the FS returns false on a rename * Flag to indicate that the FS returns false on a rename
* if the source is missing * if the source is missing
* @{value} * {@value}
*/ */
String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING = String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING =
"rename-returns-false-if-source-missing"; "rename-returns-false-if-source-missing";
@ -82,74 +82,74 @@ public interface ContractOptions {
/** /**
* Flag to indicate that the FS remove dest first if it is an empty directory * Flag to indicate that the FS remove dest first if it is an empty directory
* mean the FS honors POSIX rename behavior. * mean the FS honors POSIX rename behavior.
* @{value} * {@value}
*/ */
String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir"; String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir";
/** /**
* Flag to indicate that append is supported * Flag to indicate that append is supported
* @{value} * {@value}
*/ */
String SUPPORTS_APPEND = "supports-append"; String SUPPORTS_APPEND = "supports-append";
/** /**
* Flag to indicate that setTimes is supported. * Flag to indicate that setTimes is supported.
* @{value} * {@value}
*/ */
String SUPPORTS_SETTIMES = "supports-settimes"; String SUPPORTS_SETTIMES = "supports-settimes";
/** /**
* Flag to indicate that getFileStatus is supported. * Flag to indicate that getFileStatus is supported.
* @{value} * {@value}
*/ */
String SUPPORTS_GETFILESTATUS = "supports-getfilestatus"; String SUPPORTS_GETFILESTATUS = "supports-getfilestatus";
/** /**
* Flag to indicate that renames are atomic * Flag to indicate that renames are atomic
* @{value} * {@value}
*/ */
String SUPPORTS_ATOMIC_RENAME = "supports-atomic-rename"; String SUPPORTS_ATOMIC_RENAME = "supports-atomic-rename";
/** /**
* Flag to indicate that directory deletes are atomic * Flag to indicate that directory deletes are atomic
* @{value} * {@value}
*/ */
String SUPPORTS_ATOMIC_DIRECTORY_DELETE = "supports-atomic-directory-delete"; String SUPPORTS_ATOMIC_DIRECTORY_DELETE = "supports-atomic-directory-delete";
/** /**
* Does the FS support multiple block locations? * Does the FS support multiple block locations?
* @{value} * {@value}
*/ */
String SUPPORTS_BLOCK_LOCALITY = "supports-block-locality"; String SUPPORTS_BLOCK_LOCALITY = "supports-block-locality";
/** /**
* Does the FS support the concat() operation? * Does the FS support the concat() operation?
* @{value} * {@value}
*/ */
String SUPPORTS_CONCAT = "supports-concat"; String SUPPORTS_CONCAT = "supports-concat";
/** /**
* Is seeking supported at all? * Is seeking supported at all?
* @{value} * {@value}
*/ */
String SUPPORTS_SEEK = "supports-seek"; String SUPPORTS_SEEK = "supports-seek";
/** /**
* Is seeking past the EOF allowed? * Is seeking past the EOF allowed?
* @{value} * {@value}
*/ */
String REJECTS_SEEK_PAST_EOF = "rejects-seek-past-eof"; String REJECTS_SEEK_PAST_EOF = "rejects-seek-past-eof";
/** /**
* Is seeking on a closed file supported? Some filesystems only raise an * Is seeking on a closed file supported? Some filesystems only raise an
* exception later, when trying to read. * exception later, when trying to read.
* @{value} * {@value}
*/ */
String SUPPORTS_SEEK_ON_CLOSED_FILE = "supports-seek-on-closed-file"; String SUPPORTS_SEEK_ON_CLOSED_FILE = "supports-seek-on-closed-file";
/** /**
* Is available() on a closed InputStream supported? * Is available() on a closed InputStream supported?
* @{value} * {@value}
*/ */
String SUPPORTS_AVAILABLE_ON_CLOSED_FILE = "supports-available-on-closed-file"; String SUPPORTS_AVAILABLE_ON_CLOSED_FILE = "supports-available-on-closed-file";
@ -157,32 +157,39 @@ public interface ContractOptions {
* Flag to indicate that this FS expects to throw the strictest * Flag to indicate that this FS expects to throw the strictest
* exceptions it can, not generic IOEs, which, if returned, * exceptions it can, not generic IOEs, which, if returned,
* must be rejected. * must be rejected.
* @{value} * {@value}
*/ */
String SUPPORTS_STRICT_EXCEPTIONS = "supports-strict-exceptions"; String SUPPORTS_STRICT_EXCEPTIONS = "supports-strict-exceptions";
/** /**
* Are unix permissions * Are unix permissions
* @{value} * {@value}
*/ */
String SUPPORTS_UNIX_PERMISSIONS = "supports-unix-permissions"; String SUPPORTS_UNIX_PERMISSIONS = "supports-unix-permissions";
/**
* Is positioned readable supported? Supporting seek should be sufficient
* for this.
* {@value}
*/
String SUPPORTS_POSITIONED_READABLE = "supports-positioned-readable";
/** /**
* Maximum path length * Maximum path length
* @{value} * {@value}
*/ */
String MAX_PATH_ = "max-path"; String MAX_PATH_ = "max-path";
/** /**
* Maximum filesize: 0 or -1 for no limit * Maximum filesize: 0 or -1 for no limit
* @{value} * {@value}
*/ */
String MAX_FILESIZE = "max-filesize"; String MAX_FILESIZE = "max-filesize";
/** /**
* Flag to indicate that tests on the root directories of a filesystem/ * Flag to indicate that tests on the root directories of a filesystem/
* object store are permitted * object store are permitted
* @{value} * {@value}
*/ */
String TEST_ROOT_TESTS_ENABLED = "test.root-tests-enabled"; String TEST_ROOT_TESTS_ENABLED = "test.root-tests-enabled";

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.internal.AssumptionViolatedException; import org.junit.internal.AssumptionViolatedException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -432,9 +433,7 @@ public static void skip(String message) {
* @throws AssertionError with the text and throwable -always * @throws AssertionError with the text and throwable -always
*/ */
public static void fail(String text, Throwable thrown) { public static void fail(String text, Throwable thrown) {
AssertionError e = new AssertionError(text); throw new AssertionError(text, thrown);
e.initCause(thrown);
throw e;
} }
/** /**
@ -509,10 +508,14 @@ public static void createFile(FileSystem fs,
boolean overwrite, boolean overwrite,
byte[] data) throws IOException { byte[] data) throws IOException {
FSDataOutputStream stream = fs.create(path, overwrite); FSDataOutputStream stream = fs.create(path, overwrite);
if (data != null && data.length > 0) { try {
stream.write(data); if (data != null && data.length > 0) {
stream.write(data);
}
stream.close();
} finally {
IOUtils.closeStream(stream);
} }
stream.close();
} }
/** /**
@ -574,13 +577,10 @@ public static void assertDeleted(FileSystem fs,
public static String readBytesToString(FileSystem fs, public static String readBytesToString(FileSystem fs,
Path path, Path path,
int length) throws IOException { int length) throws IOException {
FSDataInputStream in = fs.open(path); try (FSDataInputStream in = fs.open(path)) {
try {
byte[] buf = new byte[length]; byte[] buf = new byte[length];
in.readFully(0, buf); in.readFully(0, buf);
return toChar(buf); return toChar(buf);
} finally {
in.close();
} }
} }
@ -786,8 +786,7 @@ public static void verifyReceivedData(FileSystem fs, Path path,
long totalBytesRead = 0; long totalBytesRead = 0;
int nextExpectedNumber = 0; int nextExpectedNumber = 0;
final InputStream inputStream = fs.open(path); try (InputStream inputStream = fs.open(path)) {
try {
while (true) { while (true) {
final int bytesRead = inputStream.read(testBuffer); final int bytesRead = inputStream.read(testBuffer);
if (bytesRead < 0) { if (bytesRead < 0) {
@ -814,8 +813,6 @@ public static void verifyReceivedData(FileSystem fs, Path path,
throw new IOException("Expected to read " + expectedSize + throw new IOException("Expected to read " + expectedSize +
" bytes but only received " + totalBytesRead); " bytes but only received " + totalBytesRead);
} }
} finally {
inputStream.close();
} }
} }

View File

@ -100,7 +100,7 @@ case sensitivity and permission options are determined at run time from OS type
<value>true</value> <value>true</value>
</property> </property>
<!-- checksum FS doesn't allow seeing past EOF --> <!-- checksum FS doesn't allow seeking past EOF -->
<property> <property>
<name>fs.contract.rejects-seek-past-eof</name> <name>fs.contract.rejects-seek-past-eof</name>
<value>true</value> <value>true</value>

View File

@ -1473,6 +1473,10 @@ protected static boolean tokenRefetchNeeded(IOException ex,
@Override @Override
public int read(long position, byte[] buffer, int offset, int length) public int read(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
try (TraceScope scope = dfsClient. try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread", newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) { src, position, length)) {

View File

@ -28,6 +28,7 @@
import java.util.StringTokenizer; import java.util.StringTokenizer;
import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
@ -129,6 +130,9 @@ protected InputStream getInputStream() throws IOException {
@VisibleForTesting @VisibleForTesting
protected InputStreamAndFileLength openInputStream(long startOffset) protected InputStreamAndFileLength openInputStream(long startOffset)
throws IOException { throws IOException {
if (startOffset < 0) {
throw new EOFException("Negative Position");
}
// Use the original url if no resolved url exists, eg. if // Use the original url if no resolved url exists, eg. if
// it's the first time a request is made. // it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null; final boolean resolved = resolvedURL.getURL() != null;
@ -250,6 +254,10 @@ public void seek(long pos) throws IOException {
@Override @Override
public int read(long position, byte[] buffer, int offset, int length) public int read(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
try (InputStream in = openInputStream(position).in) { try (InputStream in = openInputStream(position).in) {
return in.read(buffer, offset, length); return in.read(buffer, offset, length);
} }
@ -258,17 +266,21 @@ public int read(long position, byte[] buffer, int offset, int length)
@Override @Override
public void readFully(long position, byte[] buffer, int offset, int length) public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException { throws IOException {
final InputStreamAndFileLength fin = openInputStream(position); validatePositionedReadArgs(position, buffer, offset, length);
if (fin.length != null && length + position > fin.length) { if (length == 0) {
throw new EOFException("The length to read " + length return;
+ " exceeds the file length " + fin.length);
} }
final InputStreamAndFileLength fin = openInputStream(position);
try { try {
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
}
int nread = 0; int nread = 0;
while (nread < length) { while (nread < length) {
int nbytes = fin.in.read(buffer, offset + nread, length - nread); int nbytes = fin.in.read(buffer, offset + nread, length - nread);
if (nbytes < 0) { if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully."); throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
} }
nread += nbytes; nread += nbytes;
} }

View File

@ -137,6 +137,8 @@ public synchronized int read() throws IOException {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e); LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos); reopen(pos);
byteRead = wrappedStream.read(); byteRead = wrappedStream.read();
} catch (EOFException e) {
return -1;
} }
if (byteRead >= 0) { if (byteRead >= 0) {
@ -216,4 +218,42 @@ public synchronized int available() throws IOException {
public boolean markSupported() { public boolean markSupported() {
return false; return false;
} }
/**
* Subclass {@code readFully()} operation which only seeks at the start
* of the series of operations; seeking back at the end.
*
* This is significantly higher performance if multiple read attempts are
* needed to fetch the data, as it does not break the HTTP connection.
*
* To maintain thread safety requirements, this operation is synchronized
* for the duration of the sequence.
* {@inheritDoc}
*
*/
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return;
}
int nread = 0;
synchronized (this) {
long oldPos = getPos();
try {
seek(position);
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
} finally {
seek(oldPos);
}
}
}
} }

View File

@ -725,6 +725,8 @@ public synchronized int read() throws FileNotFoundException, IOException {
// Return to the caller with the result. // Return to the caller with the result.
// //
return result; return result;
} catch(EOFException e) {
return -1;
} catch(IOException e) { } catch(IOException e) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e); Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
@ -773,7 +775,7 @@ public synchronized int read(byte[] b, int off, int len) throws FileNotFoundExce
pos += result; pos += result;
} }
if (null != statistics) { if (null != statistics && result > 0) {
statistics.incrementBytesRead(result); statistics.incrementBytesRead(result);
} }