HADOOP-12994. Specify PositionedReadable, add contract tests, fix problems. Contributed by Steve Loughran.
(cherry picked from commit843ee8d59d
) (cherry picked from commit26a23eff82
) Conflicts: hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
This commit is contained in:
parent
4cf0b1932f
commit
1b5b85e560
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.crypto;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FilterInputStream;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
import org.apache.hadoop.fs.CanSetReadahead;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||
import org.apache.hadoop.fs.HasFileDescriptor;
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
|
@ -395,7 +397,9 @@ public class CryptoInputStream extends FilterInputStream implements
|
|||
/** Seek to a position. */
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
|
||||
if (pos < 0) {
|
||||
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
||||
}
|
||||
checkStream();
|
||||
try {
|
||||
/*
|
||||
|
|
|
@ -182,20 +182,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
|
|||
public int read(long position, byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
// parameter check
|
||||
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
} else if (len == 0) {
|
||||
validatePositionedReadArgs(position, b, off, len);
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
if( position<0 ) {
|
||||
throw new IllegalArgumentException(
|
||||
"Parameter position can not to be negative");
|
||||
}
|
||||
|
||||
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
|
||||
checker.seek(position);
|
||||
int nread = checker.read(b, off, len);
|
||||
checker.close();
|
||||
int nread;
|
||||
try (ChecksumFSInputChecker checker =
|
||||
new ChecksumFSInputChecker(fs, file)) {
|
||||
checker.seek(position);
|
||||
nread = checker.read(b, off, len);
|
||||
checker.close();
|
||||
}
|
||||
return nread;
|
||||
}
|
||||
|
||||
|
|
|
@ -169,20 +169,18 @@ public abstract class ChecksumFs extends FilterFs {
|
|||
public int read(long position, byte[] b, int off, int len)
|
||||
throws IOException, UnresolvedLinkException {
|
||||
// parameter check
|
||||
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
} else if (len == 0) {
|
||||
validatePositionedReadArgs(position, b, off, len);
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (position<0) {
|
||||
throw new IllegalArgumentException(
|
||||
"Parameter position can not to be negative");
|
||||
}
|
||||
|
||||
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
|
||||
checker.seek(position);
|
||||
int nread = checker.read(b, off, len);
|
||||
checker.close();
|
||||
int nread;
|
||||
try (ChecksumFSInputChecker checker =
|
||||
new ChecksumFSInputChecker(fs, file)) {
|
||||
checker.seek(position);
|
||||
nread = checker.read(b, off, len);
|
||||
checker.close();
|
||||
}
|
||||
return nread;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,18 +18,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.ByteBufferPool;
|
||||
import org.apache.hadoop.fs.ByteBufferUtil;
|
||||
import org.apache.hadoop.util.IdentityHashStore;
|
||||
|
||||
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
|
||||
* and buffers input through a {@link BufferedInputStream}. */
|
||||
* and buffers input through a {@link java.io.BufferedInputStream}. */
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public class FSDataInputStream extends DataInputStream
|
||||
|
@ -97,6 +100,7 @@ public class FSDataInputStream extends DataInputStream
|
|||
* @param buffer buffer into which data is read
|
||||
* @param offset offset into the buffer in which data is written
|
||||
* @param length the number of bytes to read
|
||||
* @throws IOException IO problems
|
||||
* @throws EOFException If the end of stream is reached while reading.
|
||||
* If an exception is thrown an undetermined number
|
||||
* of bytes in the buffer may have been written.
|
||||
|
|
|
@ -40,4 +40,10 @@ public class FSExceptionMessages {
|
|||
*/
|
||||
public static final String CANNOT_SEEK_PAST_EOF =
|
||||
"Attempted to seek or read past the end of the file";
|
||||
|
||||
public static final String EOF_IN_READ_FULLY =
|
||||
"End of file reached before reading fully.";
|
||||
|
||||
public static final String TOO_MANY_BYTES_FOR_DEST_BUFFER
|
||||
= "Requested more bytes than destination buffer size";
|
||||
}
|
||||
|
|
|
@ -17,22 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.ZeroCopyUnavailableException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/****************************************************************
|
||||
* FSInputStream is a generic old InputStream with a little bit
|
||||
* of RAF-style seek ability.
|
||||
*
|
||||
*****************************************************************/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS"})
|
||||
@InterfaceStability.Unstable
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class FSInputStream extends InputStream
|
||||
implements Seekable, PositionedReadable {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FSInputStream.class);
|
||||
|
||||
/**
|
||||
* Seek to the given offset from the start of the file.
|
||||
* The next read() will be from that location. Can't
|
||||
|
@ -57,12 +63,21 @@ public abstract class FSInputStream extends InputStream
|
|||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return 0;
|
||||
}
|
||||
synchronized (this) {
|
||||
long oldPos = getPos();
|
||||
int nread = -1;
|
||||
try {
|
||||
seek(position);
|
||||
nread = read(buffer, offset, length);
|
||||
} catch (EOFException e) {
|
||||
// end of file; this can be raised by some filesystems
|
||||
// (often: object stores); it is swallowed here.
|
||||
LOG.debug("Downgrading EOFException raised trying to" +
|
||||
" read {} bytes at offset {}", length, offset, e);
|
||||
} finally {
|
||||
seek(oldPos);
|
||||
}
|
||||
|
@ -70,14 +85,42 @@ public abstract class FSInputStream extends InputStream
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validation code, available for use in subclasses.
|
||||
* @param position position: if negative an EOF exception is raised
|
||||
* @param buffer destination buffer
|
||||
* @param offset offset within the buffer
|
||||
* @param length length of bytes to read
|
||||
* @throws EOFException if the position is negative
|
||||
* @throws IndexOutOfBoundsException if there isn't space for the amount of
|
||||
* data requested.
|
||||
* @throws IllegalArgumentException other arguments are invalid.
|
||||
*/
|
||||
protected void validatePositionedReadArgs(long position,
|
||||
byte[] buffer, int offset, int length) throws EOFException {
|
||||
Preconditions.checkArgument(length >= 0, "length is negative");
|
||||
if (position < 0) {
|
||||
throw new EOFException("position is negative");
|
||||
}
|
||||
Preconditions.checkArgument(buffer != null, "Null buffer");
|
||||
if (buffer.length - offset < length) {
|
||||
throw new IndexOutOfBoundsException(
|
||||
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
int nread = 0;
|
||||
while (nread < length) {
|
||||
int nbytes = read(position+nread, buffer, offset+nread, length-nread);
|
||||
int nbytes = read(position + nread,
|
||||
buffer,
|
||||
offset + nread,
|
||||
length - nread);
|
||||
if (nbytes < 0) {
|
||||
throw new EOFException("End of file reached before reading fully.");
|
||||
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||
}
|
||||
nread += nbytes;
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
|
|||
import org.apache.hadoop.util.LineReader;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
|
@ -1053,17 +1054,16 @@ public class HarFileSystem extends FileSystem {
|
|||
@Override
|
||||
public void readFully(long pos, byte[] b, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(pos, b, offset, length);
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
if (start + length + pos > end) {
|
||||
throw new IOException("Not enough bytes to read.");
|
||||
throw new EOFException("Not enough bytes to read.");
|
||||
}
|
||||
underLyingStream.readFully(pos + start, b, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(long pos, byte[] b) throws IOException {
|
||||
readFully(pos, b, 0, b.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setReadahead(Long readahead) throws IOException {
|
||||
underLyingStream.setReadahead(readahead);
|
||||
|
|
|
@ -22,30 +22,67 @@ import java.io.*;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/** Stream that permits positional reading. */
|
||||
/**
|
||||
* Stream that permits positional reading.
|
||||
*
|
||||
* Implementations are required to implement thread-safe operations; this may
|
||||
* be supported by concurrent access to the data, or by using a synchronization
|
||||
* mechanism to serialize access.
|
||||
*
|
||||
* Not all implementations meet this requirement. Those that do not cannot
|
||||
* be used as a backing store for some applications, such as Apache HBase.
|
||||
*
|
||||
* Independent of whether or not they are thread safe, some implementations
|
||||
* may make the intermediate state of the system, specifically the position
|
||||
* obtained in {@code Seekable.getPos()} visible.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public interface PositionedReadable {
|
||||
/**
|
||||
* Read upto the specified number of bytes, from a given
|
||||
* Read up to the specified number of bytes, from a given
|
||||
* position within a file, and return the number of bytes read. This does not
|
||||
* change the current offset of a file, and is thread-safe.
|
||||
*
|
||||
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
|
||||
* @param position position within file
|
||||
* @param buffer destination buffer
|
||||
* @param offset offset in the buffer
|
||||
* @param length number of bytes to read
|
||||
* @return actual number of bytes read; -1 means "none"
|
||||
* @throws IOException IO problems.
|
||||
*/
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Read the specified number of bytes, from a given
|
||||
* position within a file. This does not
|
||||
* change the current offset of a file, and is thread-safe.
|
||||
*
|
||||
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
|
||||
* @param position position within file
|
||||
* @param buffer destination buffer
|
||||
* @param offset offset in the buffer
|
||||
* @param length number of bytes to read
|
||||
* @throws IOException IO problems.
|
||||
* @throws EOFException the end of the data was reached before
|
||||
* the read operation completed
|
||||
*/
|
||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||
void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Read number of bytes equal to the length of the buffer, from a given
|
||||
* position within a file. This does not
|
||||
* change the current offset of a file, and is thread-safe.
|
||||
*
|
||||
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
|
||||
* @param position position within file
|
||||
* @param buffer destination buffer
|
||||
* @throws IOException IO problems.
|
||||
* @throws EOFException the end of the data was reached before
|
||||
* the read operation completed
|
||||
*/
|
||||
public void readFully(long position, byte[] buffer) throws IOException;
|
||||
void readFully(long position, byte[] buffer) throws IOException;
|
||||
}
|
||||
|
|
|
@ -157,6 +157,8 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
// parameter check
|
||||
validatePositionedReadArgs(position, b, off, len);
|
||||
try {
|
||||
int value = fis.read(b, off, len);
|
||||
if (value > 0) {
|
||||
|
@ -172,6 +174,12 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
@Override
|
||||
public int read(long position, byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
// parameter check
|
||||
validatePositionedReadArgs(position, b, off, len);
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
|
||||
try {
|
||||
int value = fis.getChannel().read(bb, position);
|
||||
|
|
|
@ -279,6 +279,9 @@ on the underlying stream:
|
|||
read(dest3, ... len3) -> dest3[0..len3 - 1] =
|
||||
[data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
|
||||
|
||||
Note that implementations are not required to be atomic; the intermediate state
|
||||
of the operation (the change in the value of `getPos()`) may be visible.
|
||||
|
||||
#### Implementation preconditions
|
||||
|
||||
Not all `FSDataInputStream` implementations support these operations. Those that do
|
||||
|
@ -287,7 +290,7 @@ interface.
|
|||
|
||||
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
|
||||
|
||||
This could be considered obvious: if a stream is not Seekable, a client
|
||||
This could be considered obvious: if a stream is not `Seekable`, a client
|
||||
cannot seek to a location. It is also a side effect of the
|
||||
base class implementation, which uses `Seekable.seek()`.
|
||||
|
||||
|
@ -304,14 +307,14 @@ For any operations that fail, the contents of the destination
|
|||
`buffer` are undefined. Implementations may overwrite part
|
||||
or all of the buffer before reporting a failure.
|
||||
|
||||
|
||||
|
||||
### `int PositionedReadable.read(position, buffer, offset, length)`
|
||||
|
||||
Read as much data as possible into the buffer space allocated for it.
|
||||
|
||||
#### Preconditions
|
||||
|
||||
position > 0 else raise [IllegalArgumentException, RuntimeException]
|
||||
len(buffer) + offset < len(data) else raise [IndexOutOfBoundException, RuntimeException]
|
||||
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
|
||||
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
|
||||
length >= 0
|
||||
offset >= 0
|
||||
|
||||
|
@ -324,23 +327,36 @@ of data available from the specified position:
|
|||
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
|
||||
result = available
|
||||
|
||||
1. A return value of -1 means that the stream had no more available data.
|
||||
1. An invocation with `length==0` implicitly does not read any data;
|
||||
implementations may short-cut the operation and omit any IO. In such instances,
|
||||
checks for the stream being at the end of the file may be omitted.
|
||||
1. If an IO exception occurs during the read operation(s),
|
||||
the final state of `buffer` is undefined.
|
||||
|
||||
### `void PositionedReadable.readFully(position, buffer, offset, length)`
|
||||
|
||||
Read exactly `length` bytes of data into the buffer, failing if there is not
|
||||
enough data available.
|
||||
|
||||
#### Preconditions
|
||||
|
||||
position > 0 else raise [IllegalArgumentException, RuntimeException]
|
||||
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
|
||||
length >= 0
|
||||
offset >= 0
|
||||
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
|
||||
(position + length) <= len(data) else raise [EOFException, IOException]
|
||||
len(buffer) + offset < len(data)
|
||||
|
||||
If an IO exception occurs during the read operation(s),
|
||||
the final state of `buffer` is undefined.
|
||||
|
||||
If there is not enough data in the input stream to satisfy the requests,
|
||||
the final state of `buffer` is undefined.
|
||||
|
||||
#### Postconditions
|
||||
|
||||
The amount of data read is the less of the length or the amount
|
||||
of data available from the specified position:
|
||||
The buffer from offset `offset` is filled with the data starting at `position`
|
||||
|
||||
let available = min(length, len(data)-position)
|
||||
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
|
||||
|
||||
### `PositionedReadable.readFully(position, buffer)`
|
||||
|
@ -349,6 +365,9 @@ The semantics of this are exactly equivalent to
|
|||
|
||||
readFully(position, buffer, 0, len(buffer))
|
||||
|
||||
That is, the buffer is filled entirely with the contents of the input source
|
||||
from position `position`
|
||||
|
||||
|
||||
## Consistency
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.crypto;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||
import org.apache.hadoop.fs.PositionedReadable;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
|
@ -339,7 +341,7 @@ public abstract class CryptoStreamsTestBase {
|
|||
try {
|
||||
((PositionedReadable) in).readFully(pos, result);
|
||||
Assert.fail("Read fully exceeds maximum length should fail.");
|
||||
} catch (IOException e) {
|
||||
} catch (EOFException e) {
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -365,9 +367,9 @@ public abstract class CryptoStreamsTestBase {
|
|||
try {
|
||||
seekCheck(in, -3);
|
||||
Assert.fail("Seek to negative offset should fail.");
|
||||
} catch (IllegalArgumentException e) {
|
||||
GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
|
||||
"offset", e);
|
||||
} catch (EOFException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
FSExceptionMessages.NEGATIVE_SEEK, e);
|
||||
}
|
||||
Assert.assertEquals(pos, ((Seekable) in).getPos());
|
||||
|
||||
|
|
|
@ -52,11 +52,8 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
|
|||
public void testAppendToEmptyFile() throws Throwable {
|
||||
touch(getFileSystem(), target);
|
||||
byte[] dataset = dataset(256, 'a', 'z');
|
||||
FSDataOutputStream outputStream = getFileSystem().append(target);
|
||||
try {
|
||||
try (FSDataOutputStream outputStream = getFileSystem().append(target)) {
|
||||
outputStream.write(dataset);
|
||||
} finally {
|
||||
outputStream.close();
|
||||
}
|
||||
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
|
||||
dataset.length);
|
||||
|
|
|
@ -53,7 +53,7 @@ public abstract class AbstractContractConcatTest extends AbstractFSContractTestB
|
|||
target = new Path(testPath, "target");
|
||||
|
||||
byte[] block = dataset(TEST_FILE_LEN, 0, 255);
|
||||
createFile(getFileSystem(), srcFile, false, block);
|
||||
createFile(getFileSystem(), srcFile, true, block);
|
||||
touch(getFileSystem(), zeroByteFile);
|
||||
}
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ public abstract class AbstractContractCreateTest extends
|
|||
} catch (AssertionError failure) {
|
||||
if (isSupported(IS_BLOBSTORE)) {
|
||||
// file/directory hack surfaces here
|
||||
throw new AssumptionViolatedException(failure.toString()).initCause(failure);
|
||||
throw new AssumptionViolatedException(failure.toString(), failure);
|
||||
}
|
||||
// else: rethrow
|
||||
throw failure;
|
||||
|
@ -163,13 +163,11 @@ public abstract class AbstractContractCreateTest extends
|
|||
public void testCreatedFileIsImmediatelyVisible() throws Throwable {
|
||||
describe("verify that a newly created file exists as soon as open returns");
|
||||
Path path = path("testCreatedFileIsImmediatelyVisible");
|
||||
FSDataOutputStream out = null;
|
||||
try {
|
||||
out = getFileSystem().create(path,
|
||||
try(FSDataOutputStream out = getFileSystem().create(path,
|
||||
false,
|
||||
4096,
|
||||
(short) 1,
|
||||
1024);
|
||||
1024)) {
|
||||
if (!getFileSystem().exists(path)) {
|
||||
|
||||
if (isSupported(IS_BLOBSTORE)) {
|
||||
|
@ -180,8 +178,6 @@ public abstract class AbstractContractCreateTest extends
|
|||
assertPathExists("expected path to be visible before anything written",
|
||||
path);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ public abstract class AbstractContractDeleteTest extends
|
|||
@Test
|
||||
public void testDeleteNonexistentPathRecursive() throws Throwable {
|
||||
Path path = path("testDeleteNonexistentPathRecursive");
|
||||
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path);
|
||||
assertPathDoesNotExist("leftover", path);
|
||||
ContractTestUtils.rejectRootOperation(path);
|
||||
assertFalse("Returned true attempting to delete"
|
||||
+ " a nonexistent path " + path,
|
||||
|
@ -58,7 +58,7 @@ public abstract class AbstractContractDeleteTest extends
|
|||
@Test
|
||||
public void testDeleteNonexistentPathNonRecursive() throws Throwable {
|
||||
Path path = path("testDeleteNonexistentPathNonRecursive");
|
||||
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path);
|
||||
assertPathDoesNotExist("leftover", path);
|
||||
ContractTestUtils.rejectRootOperation(path);
|
||||
assertFalse("Returned true attempting to recursively delete"
|
||||
+ " a nonexistent path " + path,
|
||||
|
@ -81,7 +81,7 @@ public abstract class AbstractContractDeleteTest extends
|
|||
//expected
|
||||
handleExpectedException(expected);
|
||||
}
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), path);
|
||||
assertIsDirectory(path);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -92,7 +92,7 @@ public abstract class AbstractContractDeleteTest extends
|
|||
ContractTestUtils.writeTextFile(getFileSystem(), file, "goodbye, world",
|
||||
true);
|
||||
assertDeleted(path, true);
|
||||
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file);
|
||||
assertPathDoesNotExist("not deleted", file);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -100,12 +100,11 @@ public abstract class AbstractContractDeleteTest extends
|
|||
mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
|
||||
assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true);
|
||||
|
||||
FileSystem fs = getFileSystem();
|
||||
ContractTestUtils.assertPathDoesNotExist(fs,
|
||||
assertPathDoesNotExist(
|
||||
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
|
||||
ContractTestUtils.assertPathDoesNotExist(fs,
|
||||
assertPathDoesNotExist(
|
||||
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3"));
|
||||
ContractTestUtils.assertPathExists(fs, "parent dir is deleted",
|
||||
assertPathExists( "parent dir is deleted",
|
||||
path("testDeleteDeepEmptyDir/d1/d2"));
|
||||
}
|
||||
|
||||
|
@ -117,8 +116,7 @@ public abstract class AbstractContractDeleteTest extends
|
|||
Path file = new Path(path, "childfile");
|
||||
ContractTestUtils.writeTextFile(getFileSystem(), file,
|
||||
"single file to be deleted.", true);
|
||||
ContractTestUtils.assertPathExists(getFileSystem(),
|
||||
"single file not created", file);
|
||||
assertPathExists("single file not created", file);
|
||||
assertDeleted(file, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,12 +67,9 @@ public abstract class AbstractContractMkdirTest extends AbstractFSContractTestBa
|
|||
boolean made = fs.mkdirs(path);
|
||||
fail("mkdirs did not fail over a file but returned " + made
|
||||
+ "; " + ls(path));
|
||||
} catch (ParentNotDirectoryException e) {
|
||||
} catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
|
||||
//parent is a directory
|
||||
handleExpectedException(e);
|
||||
} catch (FileAlreadyExistsException e) {
|
||||
//also allowed as an exception (HDFS)
|
||||
handleExpectedException(e);;
|
||||
} catch (IOException e) {
|
||||
//here the FS says "no create"
|
||||
handleRelaxedException("mkdirs", "FileAlreadyExistsException", e);
|
||||
|
@ -97,11 +94,9 @@ public abstract class AbstractContractMkdirTest extends AbstractFSContractTestBa
|
|||
boolean made = fs.mkdirs(child);
|
||||
fail("mkdirs did not fail over a file but returned " + made
|
||||
+ "; " + ls(path));
|
||||
} catch (ParentNotDirectoryException e) {
|
||||
} catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
|
||||
//parent is a directory
|
||||
handleExpectedException(e);
|
||||
} catch (FileAlreadyExistsException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException e) {
|
||||
handleRelaxedException("mkdirs", "ParentNotDirectoryException", e);
|
||||
}
|
||||
|
|
|
@ -125,10 +125,10 @@ public abstract class AbstractContractOpenTest extends AbstractFSContractTestBas
|
|||
createFile(getFileSystem(), path, false, block);
|
||||
//open first
|
||||
FSDataInputStream instream1 = getFileSystem().open(path);
|
||||
int c = instream1.read();
|
||||
assertEquals(0,c);
|
||||
FSDataInputStream instream2 = null;
|
||||
try {
|
||||
int c = instream1.read();
|
||||
assertEquals(0,c);
|
||||
instream2 = getFileSystem().open(path);
|
||||
assertEquals("first read of instream 2", 0, instream2.read());
|
||||
assertEquals("second read of instream 1", 1, instream1.read());
|
||||
|
|
|
@ -26,8 +26,7 @@ import org.junit.Test;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
|
||||
/**
|
||||
* Test creating files, overwrite options &c
|
||||
|
@ -46,9 +45,9 @@ public abstract class AbstractContractRenameTest extends
|
|||
boolean rename = rename(renameSrc, renameTarget);
|
||||
assertTrue("rename("+renameSrc+", "+ renameTarget+") returned false",
|
||||
rename);
|
||||
ContractTestUtils.assertListStatusFinds(getFileSystem(),
|
||||
assertListStatusFinds(getFileSystem(),
|
||||
renameTarget.getParent(), renameTarget);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data);
|
||||
verifyFileContents(getFileSystem(), renameTarget, data);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -129,7 +128,7 @@ public abstract class AbstractContractRenameTest extends
|
|||
}
|
||||
// verify that the destination file is as expected based on the expected
|
||||
// outcome
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), destFile,
|
||||
verifyFileContents(getFileSystem(), destFile,
|
||||
destUnchanged? destData: srcData);
|
||||
}
|
||||
|
||||
|
@ -154,7 +153,7 @@ public abstract class AbstractContractRenameTest extends
|
|||
Path renamedSrc = new Path(destDir, sourceSubdir);
|
||||
assertIsFile(destFilePath);
|
||||
assertIsDirectory(renamedSrc);
|
||||
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
|
||||
verifyFileContents(fs, destFilePath, destDateset);
|
||||
assertTrue("rename returned false though the contents were copied", rename);
|
||||
}
|
||||
|
||||
|
@ -172,10 +171,10 @@ public abstract class AbstractContractRenameTest extends
|
|||
boolean rename = rename(renameSrc, renameTarget);
|
||||
if (renameCreatesDestDirs) {
|
||||
assertTrue(rename);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data);
|
||||
verifyFileContents(getFileSystem(), renameTarget, data);
|
||||
} else {
|
||||
assertFalse(rename);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), renameSrc, data);
|
||||
verifyFileContents(getFileSystem(), renameSrc, data);
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
// allowed unless that rename flag is set
|
||||
|
@ -191,36 +190,36 @@ public abstract class AbstractContractRenameTest extends
|
|||
final Path finalDir = new Path(renameTestDir, "dest");
|
||||
FileSystem fs = getFileSystem();
|
||||
boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR);
|
||||
ContractTestUtils.rm(fs, renameTestDir, true, false);
|
||||
rm(fs, renameTestDir, true, false);
|
||||
|
||||
fs.mkdirs(srcDir);
|
||||
fs.mkdirs(finalDir);
|
||||
ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"),
|
||||
writeTextFile(fs, new Path(srcDir, "source.txt"),
|
||||
"this is the file in src dir", false);
|
||||
ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
|
||||
writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
|
||||
"this is the file in src/sub dir", false);
|
||||
|
||||
ContractTestUtils.assertPathExists(fs, "not created in src dir",
|
||||
assertPathExists("not created in src dir",
|
||||
new Path(srcDir, "source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not created in src/sub dir",
|
||||
assertPathExists("not created in src/sub dir",
|
||||
new Path(srcSubDir, "subfile.txt"));
|
||||
|
||||
fs.rename(srcDir, finalDir);
|
||||
// Accept both POSIX rename behavior and CLI rename behavior
|
||||
if (renameRemoveEmptyDest) {
|
||||
// POSIX rename behavior
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
|
||||
assertPathExists("not renamed into dest dir",
|
||||
new Path(finalDir, "source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
|
||||
assertPathExists("not renamed into dest/sub dir",
|
||||
new Path(finalDir, "sub/subfile.txt"));
|
||||
} else {
|
||||
// CLI rename behavior
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
|
||||
assertPathExists("not renamed into dest dir",
|
||||
new Path(finalDir, "src1/source.txt"));
|
||||
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
|
||||
assertPathExists("not renamed into dest/sub dir",
|
||||
new Path(finalDir, "src1/sub/subfile.txt"));
|
||||
}
|
||||
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted",
|
||||
assertPathDoesNotExist("not deleted",
|
||||
new Path(srcDir, "source.txt"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
Path dir = new Path("/testmkdirdepth1");
|
||||
assertPathDoesNotExist("directory already exists", dir);
|
||||
fs.mkdirs(dir);
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), dir);
|
||||
assertIsDirectory(dir);
|
||||
assertPathExists("directory already exists", dir);
|
||||
assertDeleted(dir, true);
|
||||
}
|
||||
|
@ -61,10 +61,10 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
//extra sanity checks here to avoid support calls about complete loss of data
|
||||
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
||||
Path root = new Path("/");
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
boolean deleted = getFileSystem().delete(root, true);
|
||||
LOG.info("rm / of empty dir result is {}", deleted);
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -75,7 +75,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
String touchfile = "/testRmNonEmptyRootDirNonRecursive";
|
||||
Path file = new Path(touchfile);
|
||||
ContractTestUtils.touch(getFileSystem(), file);
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
try {
|
||||
boolean deleted = getFileSystem().delete(root, false);
|
||||
fail("non recursive delete should have raised an exception," +
|
||||
|
@ -86,7 +86,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
} finally {
|
||||
getFileSystem().delete(file, false);
|
||||
}
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -94,11 +94,11 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
//extra sanity checks here to avoid support calls about complete loss of data
|
||||
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
||||
Path root = new Path("/");
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
Path file = new Path("/testRmRootRecursive");
|
||||
ContractTestUtils.touch(getFileSystem(), file);
|
||||
boolean deleted = getFileSystem().delete(root, true);
|
||||
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
|
||||
assertIsDirectory(root);
|
||||
LOG.info("rm -rf / result is {}", deleted);
|
||||
if (deleted) {
|
||||
assertPathDoesNotExist("expected file to be deleted", file);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Test;
|
||||
|
@ -31,9 +32,9 @@ import java.io.EOFException;
|
|||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.cleanup;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
|
||||
|
||||
|
@ -46,7 +47,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
|
||||
public static final int DEFAULT_RANDOM_SEEK_COUNT = 100;
|
||||
|
||||
private Path testPath;
|
||||
private Path smallSeekFile;
|
||||
private Path zeroByteFile;
|
||||
private FSDataInputStream instream;
|
||||
|
@ -56,13 +56,13 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
super.setup();
|
||||
skipIfUnsupported(SUPPORTS_SEEK);
|
||||
//delete the test directory
|
||||
testPath = getContract().getTestPath();
|
||||
smallSeekFile = path("seekfile.txt");
|
||||
zeroByteFile = path("zero.txt");
|
||||
byte[] block = dataset(TEST_FILE_LEN, 0, 255);
|
||||
//this file now has a simple rule: offset => value
|
||||
createFile(getFileSystem(), smallSeekFile, false, block);
|
||||
touch(getFileSystem(), zeroByteFile);
|
||||
FileSystem fs = getFileSystem();
|
||||
createFile(fs, smallSeekFile, true, block);
|
||||
touch(fs, zeroByteFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,6 +79,21 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
super.teardown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip a test case if the FS doesn't support positioned readable.
|
||||
* This should hold automatically if the FS supports seek, even
|
||||
* if it doesn't support seeking past the EOF.
|
||||
* And, because this test suite requires seek to be supported, the
|
||||
* feature is automatically assumed to be true unless stated otherwise.
|
||||
*/
|
||||
protected void assumeSupportsPositionedReadable() throws IOException {
|
||||
// because this ,
|
||||
if (!getContract().isSupported(SUPPORTS_POSITIONED_READABLE, true)) {
|
||||
skip("Skipping as unsupported feature: "
|
||||
+ SUPPORTS_POSITIONED_READABLE);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekZeroByteFile() throws Throwable {
|
||||
describe("seek and read a 0 byte file");
|
||||
|
@ -282,6 +297,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
|
||||
describe(
|
||||
"verify that a positioned read does not change the getPos() value");
|
||||
assumeSupportsPositionedReadable();
|
||||
Path testSeekFile = path("bigseekfile.txt");
|
||||
byte[] block = dataset(65536, 0, 255);
|
||||
createFile(getFileSystem(), testSeekFile, false, block);
|
||||
|
@ -290,8 +306,9 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
assertTrue(-1 != instream.read());
|
||||
assertEquals(40000, instream.getPos());
|
||||
|
||||
byte[] readBuffer = new byte[256];
|
||||
instream.read(128, readBuffer, 0, readBuffer.length);
|
||||
int v = 256;
|
||||
byte[] readBuffer = new byte[v];
|
||||
assertEquals(v, instream.read(128, readBuffer, 0, v));
|
||||
//have gone back
|
||||
assertEquals(40000, instream.getPos());
|
||||
//content is the same too
|
||||
|
@ -317,12 +334,11 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
Path randomSeekFile = path("testrandomseeks.bin");
|
||||
createFile(getFileSystem(), randomSeekFile, false, buf);
|
||||
Random r = new Random();
|
||||
FSDataInputStream stm = getFileSystem().open(randomSeekFile);
|
||||
|
||||
// Record the sequence of seeks and reads which trigger a failure.
|
||||
int[] seeks = new int[10];
|
||||
int[] reads = new int[10];
|
||||
try {
|
||||
try (FSDataInputStream stm = getFileSystem().open(randomSeekFile)) {
|
||||
for (int i = 0; i < limit; i++) {
|
||||
int seekOff = r.nextInt(buf.length);
|
||||
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
|
||||
|
@ -336,13 +352,232 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
|
|||
sb.append("Sequence of actions:\n");
|
||||
for (int j = 0; j < seeks.length; j++) {
|
||||
sb.append("seek @ ").append(seeks[j]).append(" ")
|
||||
.append("read ").append(reads[j]).append("\n");
|
||||
.append("read ").append(reads[j]).append("\n");
|
||||
}
|
||||
LOG.error(sb.toString());
|
||||
throw afe;
|
||||
} finally {
|
||||
stm.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullyZeroByteFile() throws Throwable {
|
||||
describe("readFully against a 0 byte file");
|
||||
assumeSupportsPositionedReadable();
|
||||
instream = getFileSystem().open(zeroByteFile);
|
||||
assertEquals(0, instream.getPos());
|
||||
byte[] buffer = new byte[1];
|
||||
instream.readFully(0, buffer, 0, 0);
|
||||
assertEquals(0, instream.getPos());
|
||||
// seek to 0 read 0 bytes from it
|
||||
instream.seek(0);
|
||||
assertEquals(0, instream.read(buffer, 0, 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullyPastEOFZeroByteFile() throws Throwable {
|
||||
assumeSupportsPositionedReadable();
|
||||
describe("readFully past the EOF of a 0 byte file");
|
||||
instream = getFileSystem().open(zeroByteFile);
|
||||
byte[] buffer = new byte[1];
|
||||
// try to read past end of file
|
||||
try {
|
||||
instream.readFully(0, buffer, 0, 16);
|
||||
fail("Expected an exception");
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException
|
||||
| EOFException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullySmallFile() throws Throwable {
|
||||
describe("readFully operations");
|
||||
assumeSupportsPositionedReadable();
|
||||
instream = getFileSystem().open(smallSeekFile);
|
||||
byte[] buffer = new byte[256];
|
||||
// expect negative length to fail
|
||||
try {
|
||||
instream.readFully(0, buffer, 0, -16);
|
||||
fail("Expected an exception");
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
// negative offset into buffer
|
||||
try {
|
||||
instream.readFully(0, buffer, -1, 16);
|
||||
fail("Expected an exception");
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
// expect negative position to fail, ideally with EOF
|
||||
try {
|
||||
instream.readFully(-1, buffer);
|
||||
fail("Expected an exception");
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException |IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
handleRelaxedException("readFully with a negative position ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
|
||||
// read more than the offset allows
|
||||
try {
|
||||
instream.readFully(0, buffer, buffer.length - 8, 16);
|
||||
fail("Expected an exception");
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// read properly
|
||||
assertEquals(0, instream.getPos());
|
||||
instream.readFully(0, buffer);
|
||||
assertEquals(0, instream.getPos());
|
||||
|
||||
// now read the entire file in one go
|
||||
byte[] fullFile = new byte[TEST_FILE_LEN];
|
||||
instream.readFully(0, fullFile);
|
||||
assertEquals(0, instream.getPos());
|
||||
|
||||
try {
|
||||
instream.readFully(16, fullFile);
|
||||
fail("Expected an exception");
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException e) {
|
||||
handleRelaxedException("readFully which reads past EOF ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullyPastEOF() throws Throwable {
|
||||
describe("readFully past the EOF of a file");
|
||||
assumeSupportsPositionedReadable();
|
||||
instream = getFileSystem().open(smallSeekFile);
|
||||
byte[] buffer = new byte[256];
|
||||
|
||||
// now read past the end of the file
|
||||
try {
|
||||
instream.readFully(TEST_FILE_LEN + 1, buffer);
|
||||
fail("Expected an exception");
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException e) {
|
||||
handleRelaxedException("readFully with an offset past EOF ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
// read zero bytes from an offset past EOF.
|
||||
try {
|
||||
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
|
||||
// a zero byte read may fail-fast
|
||||
LOG.info("Filesystem short-circuits 0-byte reads");
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException e) {
|
||||
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadFullyZeroBytebufferPastEOF() throws Throwable {
|
||||
describe("readFully zero bytes from an offset past EOF");
|
||||
assumeSupportsPositionedReadable();
|
||||
instream = getFileSystem().open(smallSeekFile);
|
||||
byte[] buffer = new byte[256];
|
||||
try {
|
||||
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
|
||||
// a zero byte read may fail-fast
|
||||
LOG.info("Filesystem short-circuits 0-byte reads");
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException e) {
|
||||
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadNullBuffer() throws Throwable {
|
||||
describe("try to read a null buffer ");
|
||||
assumeSupportsPositionedReadable();
|
||||
try (FSDataInputStream in = getFileSystem().open(smallSeekFile)) {
|
||||
// Null buffer
|
||||
int r = in.read(0, null, 0, 16);
|
||||
fail("Expected an exception from a read into a null buffer, got " + r);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadSmallFile() throws Throwable {
|
||||
describe("PositionedRead.read operations");
|
||||
assumeSupportsPositionedReadable();
|
||||
instream = getFileSystem().open(smallSeekFile);
|
||||
byte[] buffer = new byte[256];
|
||||
int r;
|
||||
// expect negative length to fail
|
||||
try {
|
||||
r = instream.read(0, buffer, 0, -16);
|
||||
fail("Expected an exception, got " + r);
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
// negative offset into buffer
|
||||
try {
|
||||
r = instream.read(0, buffer, -1, 16);
|
||||
fail("Expected an exception, got " + r);
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
// negative position
|
||||
try {
|
||||
r = instream.read(-1, buffer, 0, 16);
|
||||
fail("Expected an exception, got " + r);
|
||||
} catch (EOFException e) {
|
||||
handleExpectedException(e);
|
||||
} catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
handleRelaxedException("read() with a negative position ",
|
||||
"EOFException",
|
||||
e);
|
||||
}
|
||||
|
||||
// read more than the offset allows
|
||||
try {
|
||||
r = instream.read(0, buffer, buffer.length - 8, 16);
|
||||
fail("Expected an exception, got " + r);
|
||||
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
|
||||
// expected
|
||||
}
|
||||
|
||||
// read properly
|
||||
assertEquals(0, instream.getPos());
|
||||
instream.readFully(0, buffer);
|
||||
assertEquals(0, instream.getPos());
|
||||
|
||||
// now read the entire file in one go
|
||||
byte[] fullFile = new byte[TEST_FILE_LEN];
|
||||
assertEquals(TEST_FILE_LEN,
|
||||
instream.read(0, fullFile, 0, fullFile.length));
|
||||
assertEquals(0, instream.getPos());
|
||||
|
||||
// now read past the end of the file
|
||||
assertEquals(-1,
|
||||
instream.read(TEST_FILE_LEN + 16, buffer, 0, 1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadAtExactEOF() throws Throwable {
|
||||
describe("read at the end of the file");
|
||||
instream = getFileSystem().open(smallSeekFile);
|
||||
instream.seek(TEST_FILE_LEN -1);
|
||||
assertTrue("read at last byte", instream.read() > 0);
|
||||
assertEquals("read just past EOF", -1, instream.read());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,7 +57,7 @@ public abstract class AbstractFSContractTestBase extends Assert
|
|||
public static final int DEFAULT_TEST_TIMEOUT = 180 * 1000;
|
||||
|
||||
/**
|
||||
* The FS contract used for these tets
|
||||
* The FS contract used for these tests
|
||||
*/
|
||||
private AbstractFSContract contract;
|
||||
|
||||
|
|
|
@ -53,20 +53,20 @@ public interface ContractOptions {
|
|||
/**
|
||||
* Flag to indicate that the FS can rename into directories that
|
||||
* don't exist, creating them as needed.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String RENAME_CREATES_DEST_DIRS = "rename-creates-dest-dirs";
|
||||
|
||||
/**
|
||||
* Flag to indicate that the FS does not follow the rename contract -and
|
||||
* instead only returns false on a failure.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String RENAME_OVERWRITES_DEST = "rename-overwrites-dest";
|
||||
|
||||
/**
|
||||
* Flag to indicate that the FS returns false if the destination exists
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String RENAME_RETURNS_FALSE_IF_DEST_EXISTS =
|
||||
"rename-returns-false-if-dest-exists";
|
||||
|
@ -74,7 +74,7 @@ public interface ContractOptions {
|
|||
/**
|
||||
* Flag to indicate that the FS returns false on a rename
|
||||
* if the source is missing
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING =
|
||||
"rename-returns-false-if-source-missing";
|
||||
|
@ -82,74 +82,74 @@ public interface ContractOptions {
|
|||
/**
|
||||
* Flag to indicate that the FS remove dest first if it is an empty directory
|
||||
* mean the FS honors POSIX rename behavior.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir";
|
||||
|
||||
/**
|
||||
* Flag to indicate that append is supported
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_APPEND = "supports-append";
|
||||
|
||||
/**
|
||||
* Flag to indicate that setTimes is supported.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_SETTIMES = "supports-settimes";
|
||||
|
||||
/**
|
||||
* Flag to indicate that getFileStatus is supported.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_GETFILESTATUS = "supports-getfilestatus";
|
||||
|
||||
/**
|
||||
* Flag to indicate that renames are atomic
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_ATOMIC_RENAME = "supports-atomic-rename";
|
||||
|
||||
/**
|
||||
* Flag to indicate that directory deletes are atomic
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_ATOMIC_DIRECTORY_DELETE = "supports-atomic-directory-delete";
|
||||
|
||||
/**
|
||||
* Does the FS support multiple block locations?
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_BLOCK_LOCALITY = "supports-block-locality";
|
||||
|
||||
/**
|
||||
* Does the FS support the concat() operation?
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_CONCAT = "supports-concat";
|
||||
|
||||
/**
|
||||
* Is seeking supported at all?
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_SEEK = "supports-seek";
|
||||
|
||||
/**
|
||||
* Is seeking past the EOF allowed?
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String REJECTS_SEEK_PAST_EOF = "rejects-seek-past-eof";
|
||||
|
||||
/**
|
||||
* Is seeking on a closed file supported? Some filesystems only raise an
|
||||
* exception later, when trying to read.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_SEEK_ON_CLOSED_FILE = "supports-seek-on-closed-file";
|
||||
|
||||
/**
|
||||
* Is available() on a closed InputStream supported?
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_AVAILABLE_ON_CLOSED_FILE = "supports-available-on-closed-file";
|
||||
|
||||
|
@ -157,32 +157,39 @@ public interface ContractOptions {
|
|||
* Flag to indicate that this FS expects to throw the strictest
|
||||
* exceptions it can, not generic IOEs, which, if returned,
|
||||
* must be rejected.
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_STRICT_EXCEPTIONS = "supports-strict-exceptions";
|
||||
|
||||
/**
|
||||
* Are unix permissions
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_UNIX_PERMISSIONS = "supports-unix-permissions";
|
||||
|
||||
/**
|
||||
* Is positioned readable supported? Supporting seek should be sufficient
|
||||
* for this.
|
||||
* {@value}
|
||||
*/
|
||||
String SUPPORTS_POSITIONED_READABLE = "supports-positioned-readable";
|
||||
|
||||
/**
|
||||
* Maximum path length
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String MAX_PATH_ = "max-path";
|
||||
|
||||
/**
|
||||
* Maximum filesize: 0 or -1 for no limit
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String MAX_FILESIZE = "max-filesize";
|
||||
|
||||
/**
|
||||
* Flag to indicate that tests on the root directories of a filesystem/
|
||||
* object store are permitted
|
||||
* @{value}
|
||||
* {@value}
|
||||
*/
|
||||
String TEST_ROOT_TESTS_ENABLED = "test.root-tests-enabled";
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.internal.AssumptionViolatedException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -432,9 +433,7 @@ public class ContractTestUtils extends Assert {
|
|||
* @throws AssertionError with the text and throwable -always
|
||||
*/
|
||||
public static void fail(String text, Throwable thrown) {
|
||||
AssertionError e = new AssertionError(text);
|
||||
e.initCause(thrown);
|
||||
throw e;
|
||||
throw new AssertionError(text, thrown);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -509,10 +508,14 @@ public class ContractTestUtils extends Assert {
|
|||
boolean overwrite,
|
||||
byte[] data) throws IOException {
|
||||
FSDataOutputStream stream = fs.create(path, overwrite);
|
||||
if (data != null && data.length > 0) {
|
||||
stream.write(data);
|
||||
try {
|
||||
if (data != null && data.length > 0) {
|
||||
stream.write(data);
|
||||
}
|
||||
stream.close();
|
||||
} finally {
|
||||
IOUtils.closeStream(stream);
|
||||
}
|
||||
stream.close();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -574,13 +577,10 @@ public class ContractTestUtils extends Assert {
|
|||
public static String readBytesToString(FileSystem fs,
|
||||
Path path,
|
||||
int length) throws IOException {
|
||||
FSDataInputStream in = fs.open(path);
|
||||
try {
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
byte[] buf = new byte[length];
|
||||
in.readFully(0, buf);
|
||||
return toChar(buf);
|
||||
} finally {
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -786,8 +786,7 @@ public class ContractTestUtils extends Assert {
|
|||
|
||||
long totalBytesRead = 0;
|
||||
int nextExpectedNumber = 0;
|
||||
final InputStream inputStream = fs.open(path);
|
||||
try {
|
||||
try (InputStream inputStream = fs.open(path)) {
|
||||
while (true) {
|
||||
final int bytesRead = inputStream.read(testBuffer);
|
||||
if (bytesRead < 0) {
|
||||
|
@ -814,8 +813,6 @@ public class ContractTestUtils extends Assert {
|
|||
throw new IOException("Expected to read " + expectedSize +
|
||||
" bytes but only received " + totalBytesRead);
|
||||
}
|
||||
} finally {
|
||||
inputStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ case sensitivity and permission options are determined at run time from OS type
|
|||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<!-- checksum FS doesn't allow seeing past EOF -->
|
||||
<!-- checksum FS doesn't allow seeking past EOF -->
|
||||
<property>
|
||||
<name>fs.contract.rejects-seek-past-eof</name>
|
||||
<value>true</value>
|
||||
|
|
|
@ -1459,6 +1459,10 @@ public class DFSInputStream extends FSInputStream
|
|||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return 0;
|
||||
}
|
||||
try (TraceScope ignored = dfsClient.
|
||||
newPathTraceScope("DFSInputStream#byteArrayPread", src)) {
|
||||
return pread(position, buffer, offset, length);
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Map;
|
|||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.http.HttpStatus;
|
||||
|
||||
|
@ -129,6 +130,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|||
@VisibleForTesting
|
||||
protected InputStreamAndFileLength openInputStream(long startOffset)
|
||||
throws IOException {
|
||||
if (startOffset < 0) {
|
||||
throw new EOFException("Negative Position");
|
||||
}
|
||||
// Use the original url if no resolved url exists, eg. if
|
||||
// it's the first time a request is made.
|
||||
final boolean resolved = resolvedURL.getURL() != null;
|
||||
|
@ -250,6 +254,10 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|||
@Override
|
||||
public int read(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return 0;
|
||||
}
|
||||
try (InputStream in = openInputStream(position).in) {
|
||||
return in.read(buffer, offset, length);
|
||||
}
|
||||
|
@ -258,17 +266,21 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
|||
@Override
|
||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
final InputStreamAndFileLength fin = openInputStream(position);
|
||||
if (fin.length != null && length + position > fin.length) {
|
||||
throw new EOFException("The length to read " + length
|
||||
+ " exceeds the file length " + fin.length);
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
final InputStreamAndFileLength fin = openInputStream(position);
|
||||
try {
|
||||
if (fin.length != null && length + position > fin.length) {
|
||||
throw new EOFException("The length to read " + length
|
||||
+ " exceeds the file length " + fin.length);
|
||||
}
|
||||
int nread = 0;
|
||||
while (nread < length) {
|
||||
int nbytes = fin.in.read(buffer, offset + nread, length - nread);
|
||||
if (nbytes < 0) {
|
||||
throw new EOFException("End of file reached before reading fully.");
|
||||
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||
}
|
||||
nread += nbytes;
|
||||
}
|
||||
|
|
|
@ -137,6 +137,8 @@ public class S3AInputStream extends FSInputStream {
|
|||
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
|
||||
reopen(pos);
|
||||
byteRead = wrappedStream.read();
|
||||
} catch (EOFException e) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (byteRead >= 0) {
|
||||
|
@ -216,4 +218,42 @@ public class S3AInputStream extends FSInputStream {
|
|||
public boolean markSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass {@code readFully()} operation which only seeks at the start
|
||||
* of the series of operations; seeking back at the end.
|
||||
*
|
||||
* This is significantly higher performance if multiple read attempts are
|
||||
* needed to fetch the data, as it does not break the HTTP connection.
|
||||
*
|
||||
* To maintain thread safety requirements, this operation is synchronized
|
||||
* for the duration of the sequence.
|
||||
* {@inheritDoc}
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
int nread = 0;
|
||||
synchronized (this) {
|
||||
long oldPos = getPos();
|
||||
try {
|
||||
seek(position);
|
||||
while (nread < length) {
|
||||
int nbytes = read(buffer, offset + nread, length - nread);
|
||||
if (nbytes < 0) {
|
||||
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||
}
|
||||
nread += nbytes;
|
||||
}
|
||||
|
||||
} finally {
|
||||
seek(oldPos);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -725,6 +725,8 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// Return to the caller with the result.
|
||||
//
|
||||
return result;
|
||||
} catch(EOFException e) {
|
||||
return -1;
|
||||
} catch(IOException e) {
|
||||
|
||||
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
|
||||
|
@ -773,7 +775,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
pos += result;
|
||||
}
|
||||
|
||||
if (null != statistics) {
|
||||
if (null != statistics && result > 0) {
|
||||
statistics.incrementBytesRead(result);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue