HADOOP-12994. Specify PositionedReadable, add contract tests, fix problems. Contributed by Steve Loughran.

This commit is contained in:
Chris Nauroth 2016-04-08 13:36:58 -07:00
parent 9cb0c963d2
commit 843ee8d59d
28 changed files with 575 additions and 174 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.crypto;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FilterInputStream;
@ -34,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
@ -395,7 +397,9 @@ public class CryptoInputStream extends FilterInputStream implements
/** Seek to a position. */
@Override
public void seek(long pos) throws IOException {
Preconditions.checkArgument(pos >= 0, "Cannot seek to negative offset.");
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
}
checkStream();
try {
/*

View File

@ -182,20 +182,18 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
public int read(long position, byte[] b, int off, int len)
throws IOException {
// parameter check
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
if( position<0 ) {
throw new IllegalArgumentException(
"Parameter position can not to be negative");
}
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
checker.seek(position);
int nread = checker.read(b, off, len);
checker.close();
int nread;
try (ChecksumFSInputChecker checker =
new ChecksumFSInputChecker(fs, file)) {
checker.seek(position);
nread = checker.read(b, off, len);
checker.close();
}
return nread;
}

View File

@ -164,28 +164,26 @@ public abstract class ChecksumFs extends FilterFs {
public int available() throws IOException {
return datas.available() + super.available();
}
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException, UnresolvedLinkException {
// parameter check
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
if (position<0) {
throw new IllegalArgumentException(
"Parameter position can not to be negative");
}
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
checker.seek(position);
int nread = checker.read(b, off, len);
checker.close();
int nread;
try (ChecksumFSInputChecker checker =
new ChecksumFSInputChecker(fs, file)) {
checker.seek(position);
nread = checker.read(b, off, len);
checker.close();
}
return nread;
}
@Override
public void close() throws IOException {
datas.close();

View File

@ -18,18 +18,21 @@
*/
package org.apache.hadoop.fs;
import java.io.*;
import java.io.DataInputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.util.IdentityHashStore;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
* and buffers input through a {@link java.io.BufferedInputStream}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
@ -97,6 +100,7 @@ public class FSDataInputStream extends DataInputStream
* @param buffer buffer into which data is read
* @param offset offset into the buffer in which data is written
* @param length the number of bytes to read
* @throws IOException IO problems
* @throws EOFException If the end of stream is reached while reading.
* If an exception is thrown an undetermined number
* of bytes in the buffer may have been written.

View File

@ -40,4 +40,10 @@ public class FSExceptionMessages {
*/
public static final String CANNOT_SEEK_PAST_EOF =
"Attempted to seek or read past the end of the file";
public static final String EOF_IN_READ_FULLY =
"End of file reached before reading fully.";
public static final String TOO_MANY_BYTES_FOR_DEST_BUFFER
= "Requested more bytes than destination buffer size";
}

View File

@ -17,22 +17,28 @@
*/
package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ZeroCopyUnavailableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit
* of RAF-style seek ability.
*
*****************************************************************/
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class FSInputStream extends InputStream
implements Seekable, PositionedReadable {
private static final Logger LOG =
LoggerFactory.getLogger(FSInputStream.class);
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
@ -57,32 +63,69 @@ public abstract class FSInputStream extends InputStream
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
synchronized (this) {
long oldPos = getPos();
int nread = -1;
try {
seek(position);
nread = read(buffer, offset, length);
} catch (EOFException e) {
// end of file; this can be raised by some filesystems
// (often: object stores); it is swallowed here.
LOG.debug("Downgrading EOFException raised trying to" +
" read {} bytes at offset {}", length, offset, e);
} finally {
seek(oldPos);
}
return nread;
}
}
/**
* Validation code, available for use in subclasses.
* @param position position: if negative an EOF exception is raised
* @param buffer destination buffer
* @param offset offset within the buffer
* @param length length of bytes to read
* @throws EOFException if the position is negative
* @throws IndexOutOfBoundsException if there isn't space for the amount of
* data requested.
* @throws IllegalArgumentException other arguments are invalid.
*/
protected void validatePositionedReadArgs(long position,
byte[] buffer, int offset, int length) throws EOFException {
Preconditions.checkArgument(length >= 0, "length is negative");
if (position < 0) {
throw new EOFException("position is negative");
}
Preconditions.checkArgument(buffer != null, "Null buffer");
if (buffer.length - offset < length) {
throw new IndexOutOfBoundsException(
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER);
}
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
int nread = 0;
while (nread < length) {
int nbytes = read(position+nread, buffer, offset+nread, length-nread);
int nbytes = read(position + nread,
buffer,
offset + nread,
length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
}
@Override
public void readFully(long position, byte[] buffer)
throws IOException {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Progressable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@ -1053,16 +1054,15 @@ public class HarFileSystem extends FileSystem {
@Override
public void readFully(long pos, byte[] b, int offset, int length)
throws IOException {
validatePositionedReadArgs(pos, b, offset, length);
if (length == 0) {
return;
}
if (start + length + pos > end) {
throw new IOException("Not enough bytes to read.");
throw new EOFException("Not enough bytes to read.");
}
underLyingStream.readFully(pos + start, b, offset, length);
}
@Override
public void readFully(long pos, byte[] b) throws IOException {
readFully(pos, b, 0, b.length);
}
@Override
public void setReadahead(Long readahead) throws IOException {

View File

@ -22,30 +22,67 @@ import java.io.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** Stream that permits positional reading. */
/**
* Stream that permits positional reading.
*
* Implementations are required to implement thread-safe operations; this may
* be supported by concurrent access to the data, or by using a synchronization
* mechanism to serialize access.
*
* Not all implementations meet this requirement. Those that do not cannot
* be used as a backing store for some applications, such as Apache HBase.
*
* Independent of whether or not they are thread safe, some implementations
* may make the intermediate state of the system, specifically the position
* obtained in {@code Seekable.getPos()} visible.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface PositionedReadable {
/**
* Read upto the specified number of bytes, from a given
* Read up to the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @return actual number of bytes read; -1 means "none"
* @throws IOException IO problems.
*/
public int read(long position, byte[] buffer, int offset, int length)
int read(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read the specified number of bytes, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
public void readFully(long position, byte[] buffer, int offset, int length)
void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
public void readFully(long position, byte[] buffer) throws IOException;
void readFully(long position, byte[] buffer) throws IOException;
}

View File

@ -160,6 +160,8 @@ public class RawLocalFileSystem extends FileSystem {
@Override
public int read(byte[] b, int off, int len) throws IOException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
try {
int value = fis.read(b, off, len);
if (value > 0) {
@ -175,6 +177,12 @@ public class RawLocalFileSystem extends FileSystem {
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
ByteBuffer bb = ByteBuffer.wrap(b, off, len);
try {
int value = fis.getChannel().read(bb, position);

View File

@ -279,6 +279,9 @@ on the underlying stream:
read(dest3, ... len3) -> dest3[0..len3 - 1] =
[data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
Note that implementations are not required to be atomic; the intermediate state
of the operation (the change in the value of `getPos()`) may be visible.
#### Implementation preconditions
Not all `FSDataInputStream` implementations support these operations. Those that do
@ -287,7 +290,7 @@ interface.
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
This could be considered obvious: if a stream is not Seekable, a client
This could be considered obvious: if a stream is not `Seekable`, a client
cannot seek to a location. It is also a side effect of the
base class implementation, which uses `Seekable.seek()`.
@ -304,14 +307,14 @@ For any operations that fail, the contents of the destination
`buffer` are undefined. Implementations may overwrite part
or all of the buffer before reporting a failure.
### `int PositionedReadable.read(position, buffer, offset, length)`
Read as much data as possible into the buffer space allocated for it.
#### Preconditions
position > 0 else raise [IllegalArgumentException, RuntimeException]
len(buffer) + offset < len(data) else raise [IndexOutOfBoundException, RuntimeException]
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0
@ -324,23 +327,36 @@ of data available from the specified position:
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
1. A return value of -1 means that the stream had no more available data.
1. An invocation with `length==0` implicitly does not read any data;
implementations may short-cut the operation and omit any IO. In such instances,
checks for the stream being at the end of the file may be omitted.
1. If an IO exception occurs during the read operation(s),
the final state of `buffer` is undefined.
### `void PositionedReadable.readFully(position, buffer, offset, length)`
Read exactly `length` bytes of data into the buffer, failing if there is not
enough data available.
#### Preconditions
position > 0 else raise [IllegalArgumentException, RuntimeException]
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException]
len(buffer) + offset < len(data)
If an IO exception occurs during the read operation(s),
the final state of `buffer` is undefined.
If there is not enough data in the input stream to satisfy the requests,
the final state of `buffer` is undefined.
#### Postconditions
The amount of data read is the less of the length or the amount
of data available from the specified position:
The buffer from offset `offset` is filled with the data starting at `position`
let available = min(length, len(data)-position)
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
### `PositionedReadable.readFully(position, buffer)`
@ -349,6 +365,9 @@ The semantics of this are exactly equivalent to
readFully(position, buffer, 0, len(buffer))
That is, the buffer is filled entirely with the contents of the input source
from position `position`
## Consistency

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.crypto;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -29,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
@ -339,7 +341,7 @@ public abstract class CryptoStreamsTestBase {
try {
((PositionedReadable) in).readFully(pos, result);
Assert.fail("Read fully exceeds maximum length should fail.");
} catch (IOException e) {
} catch (EOFException e) {
}
}
@ -365,9 +367,9 @@ public abstract class CryptoStreamsTestBase {
try {
seekCheck(in, -3);
Assert.fail("Seek to negative offset should fail.");
} catch (IllegalArgumentException e) {
GenericTestUtils.assertExceptionContains("Cannot seek to negative " +
"offset", e);
} catch (EOFException e) {
GenericTestUtils.assertExceptionContains(
FSExceptionMessages.NEGATIVE_SEEK, e);
}
Assert.assertEquals(pos, ((Seekable) in).getPos());

View File

@ -52,11 +52,8 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
public void testAppendToEmptyFile() throws Throwable {
touch(getFileSystem(), target);
byte[] dataset = dataset(256, 'a', 'z');
FSDataOutputStream outputStream = getFileSystem().append(target);
try {
try (FSDataOutputStream outputStream = getFileSystem().append(target)) {
outputStream.write(dataset);
} finally {
outputStream.close();
}
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
dataset.length);

View File

@ -53,7 +53,7 @@ public abstract class AbstractContractConcatTest extends AbstractFSContractTestB
target = new Path(testPath, "target");
byte[] block = dataset(TEST_FILE_LEN, 0, 255);
createFile(getFileSystem(), srcFile, false, block);
createFile(getFileSystem(), srcFile, true, block);
touch(getFileSystem(), zeroByteFile);
}

View File

@ -123,7 +123,7 @@ public abstract class AbstractContractCreateTest extends
} catch (AssertionError failure) {
if (isSupported(IS_BLOBSTORE)) {
// file/directory hack surfaces here
throw new AssumptionViolatedException(failure.toString()).initCause(failure);
throw new AssumptionViolatedException(failure.toString(), failure);
}
// else: rethrow
throw failure;
@ -163,13 +163,11 @@ public abstract class AbstractContractCreateTest extends
public void testCreatedFileIsImmediatelyVisible() throws Throwable {
describe("verify that a newly created file exists as soon as open returns");
Path path = path("testCreatedFileIsImmediatelyVisible");
FSDataOutputStream out = null;
try {
out = getFileSystem().create(path,
try(FSDataOutputStream out = getFileSystem().create(path,
false,
4096,
(short) 1,
1024);
1024)) {
if (!getFileSystem().exists(path)) {
if (isSupported(IS_BLOBSTORE)) {
@ -180,8 +178,6 @@ public abstract class AbstractContractCreateTest extends
assertPathExists("expected path to be visible before anything written",
path);
}
} finally {
IOUtils.closeStream(out);
}
}
}

View File

@ -47,7 +47,7 @@ public abstract class AbstractContractDeleteTest extends
@Test
public void testDeleteNonexistentPathRecursive() throws Throwable {
Path path = path("testDeleteNonexistentPathRecursive");
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path);
assertPathDoesNotExist("leftover", path);
ContractTestUtils.rejectRootOperation(path);
assertFalse("Returned true attempting to delete"
+ " a nonexistent path " + path,
@ -58,7 +58,7 @@ public abstract class AbstractContractDeleteTest extends
@Test
public void testDeleteNonexistentPathNonRecursive() throws Throwable {
Path path = path("testDeleteNonexistentPathNonRecursive");
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "leftover", path);
assertPathDoesNotExist("leftover", path);
ContractTestUtils.rejectRootOperation(path);
assertFalse("Returned true attempting to recursively delete"
+ " a nonexistent path " + path,
@ -81,7 +81,7 @@ public abstract class AbstractContractDeleteTest extends
//expected
handleExpectedException(expected);
}
ContractTestUtils.assertIsDirectory(getFileSystem(), path);
assertIsDirectory(path);
}
@Test
@ -92,7 +92,7 @@ public abstract class AbstractContractDeleteTest extends
ContractTestUtils.writeTextFile(getFileSystem(), file, "goodbye, world",
true);
assertDeleted(path, true);
ContractTestUtils.assertPathDoesNotExist(getFileSystem(), "not deleted", file);
assertPathDoesNotExist("not deleted", file);
}
@Test
@ -100,12 +100,11 @@ public abstract class AbstractContractDeleteTest extends
mkdirs(path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
assertDeleted(path("testDeleteDeepEmptyDir/d1/d2/d3"), true);
FileSystem fs = getFileSystem();
ContractTestUtils.assertPathDoesNotExist(fs,
assertPathDoesNotExist(
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3/d4"));
ContractTestUtils.assertPathDoesNotExist(fs,
assertPathDoesNotExist(
"not deleted", path("testDeleteDeepEmptyDir/d1/d2/d3"));
ContractTestUtils.assertPathExists(fs, "parent dir is deleted",
assertPathExists( "parent dir is deleted",
path("testDeleteDeepEmptyDir/d1/d2"));
}
@ -117,8 +116,7 @@ public abstract class AbstractContractDeleteTest extends
Path file = new Path(path, "childfile");
ContractTestUtils.writeTextFile(getFileSystem(), file,
"single file to be deleted.", true);
ContractTestUtils.assertPathExists(getFileSystem(),
"single file not created", file);
assertPathExists("single file not created", file);
assertDeleted(file, false);
}
}

View File

@ -67,12 +67,9 @@ public abstract class AbstractContractMkdirTest extends AbstractFSContractTestBa
boolean made = fs.mkdirs(path);
fail("mkdirs did not fail over a file but returned " + made
+ "; " + ls(path));
} catch (ParentNotDirectoryException e) {
} catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
//parent is a directory
handleExpectedException(e);
} catch (FileAlreadyExistsException e) {
//also allowed as an exception (HDFS)
handleExpectedException(e);;
} catch (IOException e) {
//here the FS says "no create"
handleRelaxedException("mkdirs", "FileAlreadyExistsException", e);
@ -97,11 +94,9 @@ public abstract class AbstractContractMkdirTest extends AbstractFSContractTestBa
boolean made = fs.mkdirs(child);
fail("mkdirs did not fail over a file but returned " + made
+ "; " + ls(path));
} catch (ParentNotDirectoryException e) {
} catch (ParentNotDirectoryException | FileAlreadyExistsException e) {
//parent is a directory
handleExpectedException(e);
} catch (FileAlreadyExistsException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("mkdirs", "ParentNotDirectoryException", e);
}

View File

@ -125,10 +125,10 @@ public abstract class AbstractContractOpenTest extends AbstractFSContractTestBas
createFile(getFileSystem(), path, false, block);
//open first
FSDataInputStream instream1 = getFileSystem().open(path);
int c = instream1.read();
assertEquals(0,c);
FSDataInputStream instream2 = null;
try {
int c = instream1.read();
assertEquals(0,c);
instream2 = getFileSystem().open(path);
assertEquals("first read of instream 2", 0, instream2.read());
assertEquals("second read of instream 1", 1, instream1.read());

View File

@ -26,8 +26,7 @@ import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
/**
* Test creating files, overwrite options &c
@ -46,9 +45,9 @@ public abstract class AbstractContractRenameTest extends
boolean rename = rename(renameSrc, renameTarget);
assertTrue("rename("+renameSrc+", "+ renameTarget+") returned false",
rename);
ContractTestUtils.assertListStatusFinds(getFileSystem(),
assertListStatusFinds(getFileSystem(),
renameTarget.getParent(), renameTarget);
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data);
verifyFileContents(getFileSystem(), renameTarget, data);
}
@Test
@ -129,7 +128,7 @@ public abstract class AbstractContractRenameTest extends
}
// verify that the destination file is as expected based on the expected
// outcome
ContractTestUtils.verifyFileContents(getFileSystem(), destFile,
verifyFileContents(getFileSystem(), destFile,
destUnchanged? destData: srcData);
}
@ -154,7 +153,7 @@ public abstract class AbstractContractRenameTest extends
Path renamedSrc = new Path(destDir, sourceSubdir);
assertIsFile(destFilePath);
assertIsDirectory(renamedSrc);
ContractTestUtils.verifyFileContents(fs, destFilePath, destDateset);
verifyFileContents(fs, destFilePath, destDateset);
assertTrue("rename returned false though the contents were copied", rename);
}
@ -172,10 +171,10 @@ public abstract class AbstractContractRenameTest extends
boolean rename = rename(renameSrc, renameTarget);
if (renameCreatesDestDirs) {
assertTrue(rename);
ContractTestUtils.verifyFileContents(getFileSystem(), renameTarget, data);
verifyFileContents(getFileSystem(), renameTarget, data);
} else {
assertFalse(rename);
ContractTestUtils.verifyFileContents(getFileSystem(), renameSrc, data);
verifyFileContents(getFileSystem(), renameSrc, data);
}
} catch (FileNotFoundException e) {
// allowed unless that rename flag is set
@ -191,36 +190,36 @@ public abstract class AbstractContractRenameTest extends
final Path finalDir = new Path(renameTestDir, "dest");
FileSystem fs = getFileSystem();
boolean renameRemoveEmptyDest = isSupported(RENAME_REMOVE_DEST_IF_EMPTY_DIR);
ContractTestUtils.rm(fs, renameTestDir, true, false);
rm(fs, renameTestDir, true, false);
fs.mkdirs(srcDir);
fs.mkdirs(finalDir);
ContractTestUtils.writeTextFile(fs, new Path(srcDir, "source.txt"),
writeTextFile(fs, new Path(srcDir, "source.txt"),
"this is the file in src dir", false);
ContractTestUtils.writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
writeTextFile(fs, new Path(srcSubDir, "subfile.txt"),
"this is the file in src/sub dir", false);
ContractTestUtils.assertPathExists(fs, "not created in src dir",
assertPathExists("not created in src dir",
new Path(srcDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not created in src/sub dir",
assertPathExists("not created in src/sub dir",
new Path(srcSubDir, "subfile.txt"));
fs.rename(srcDir, finalDir);
// Accept both POSIX rename behavior and CLI rename behavior
if (renameRemoveEmptyDest) {
// POSIX rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
assertPathExists("not renamed into dest dir",
new Path(finalDir, "source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
assertPathExists("not renamed into dest/sub dir",
new Path(finalDir, "sub/subfile.txt"));
} else {
// CLI rename behavior
ContractTestUtils.assertPathExists(fs, "not renamed into dest dir",
assertPathExists("not renamed into dest dir",
new Path(finalDir, "src1/source.txt"));
ContractTestUtils.assertPathExists(fs, "not renamed into dest/sub dir",
assertPathExists("not renamed into dest/sub dir",
new Path(finalDir, "src1/sub/subfile.txt"));
}
ContractTestUtils.assertPathDoesNotExist(fs, "not deleted",
assertPathDoesNotExist("not deleted",
new Path(srcDir, "source.txt"));
}
}

View File

@ -51,7 +51,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
Path dir = new Path("/testmkdirdepth1");
assertPathDoesNotExist("directory already exists", dir);
fs.mkdirs(dir);
ContractTestUtils.assertIsDirectory(getFileSystem(), dir);
assertIsDirectory(dir);
assertPathExists("directory already exists", dir);
assertDeleted(dir, true);
}
@ -61,10 +61,10 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
//extra sanity checks here to avoid support calls about complete loss of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/");
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
boolean deleted = getFileSystem().delete(root, true);
LOG.info("rm / of empty dir result is {}", deleted);
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
}
@Test
@ -75,7 +75,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
String touchfile = "/testRmNonEmptyRootDirNonRecursive";
Path file = new Path(touchfile);
ContractTestUtils.touch(getFileSystem(), file);
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
try {
boolean deleted = getFileSystem().delete(root, false);
fail("non recursive delete should have raised an exception," +
@ -86,7 +86,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
} finally {
getFileSystem().delete(file, false);
}
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
}
@Test
@ -94,11 +94,11 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
//extra sanity checks here to avoid support calls about complete loss of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/");
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
Path file = new Path("/testRmRootRecursive");
ContractTestUtils.touch(getFileSystem(), file);
boolean deleted = getFileSystem().delete(root, true);
ContractTestUtils.assertIsDirectory(getFileSystem(), root);
assertIsDirectory(root);
LOG.info("rm -rf / result is {}", deleted);
if (deleted) {
assertPathDoesNotExist("expected file to be deleted", file);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.contract;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
@ -31,9 +32,9 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Random;
import static org.apache.hadoop.fs.contract.ContractTestUtils.cleanup;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
@ -46,7 +47,6 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
public static final int DEFAULT_RANDOM_SEEK_COUNT = 100;
private Path testPath;
private Path smallSeekFile;
private Path zeroByteFile;
private FSDataInputStream instream;
@ -56,13 +56,13 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
super.setup();
skipIfUnsupported(SUPPORTS_SEEK);
//delete the test directory
testPath = getContract().getTestPath();
smallSeekFile = path("seekfile.txt");
zeroByteFile = path("zero.txt");
byte[] block = dataset(TEST_FILE_LEN, 0, 255);
//this file now has a simple rule: offset => value
createFile(getFileSystem(), smallSeekFile, false, block);
touch(getFileSystem(), zeroByteFile);
FileSystem fs = getFileSystem();
createFile(fs, smallSeekFile, true, block);
touch(fs, zeroByteFile);
}
@Override
@ -79,6 +79,21 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
super.teardown();
}
/**
* Skip a test case if the FS doesn't support positioned readable.
* This should hold automatically if the FS supports seek, even
* if it doesn't support seeking past the EOF.
* And, because this test suite requires seek to be supported, the
* feature is automatically assumed to be true unless stated otherwise.
*/
protected void assumeSupportsPositionedReadable() throws IOException {
// because this ,
if (!getContract().isSupported(SUPPORTS_POSITIONED_READABLE, true)) {
skip("Skipping as unsupported feature: "
+ SUPPORTS_POSITIONED_READABLE);
}
}
@Test
public void testSeekZeroByteFile() throws Throwable {
describe("seek and read a 0 byte file");
@ -282,6 +297,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
describe(
"verify that a positioned read does not change the getPos() value");
assumeSupportsPositionedReadable();
Path testSeekFile = path("bigseekfile.txt");
byte[] block = dataset(65536, 0, 255);
createFile(getFileSystem(), testSeekFile, false, block);
@ -290,8 +306,9 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
assertTrue(-1 != instream.read());
assertEquals(40000, instream.getPos());
byte[] readBuffer = new byte[256];
instream.read(128, readBuffer, 0, readBuffer.length);
int v = 256;
byte[] readBuffer = new byte[v];
assertEquals(v, instream.read(128, readBuffer, 0, v));
//have gone back
assertEquals(40000, instream.getPos());
//content is the same too
@ -317,12 +334,11 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
Path randomSeekFile = path("testrandomseeks.bin");
createFile(getFileSystem(), randomSeekFile, false, buf);
Random r = new Random();
FSDataInputStream stm = getFileSystem().open(randomSeekFile);
// Record the sequence of seeks and reads which trigger a failure.
int[] seeks = new int[10];
int[] reads = new int[10];
try {
try (FSDataInputStream stm = getFileSystem().open(randomSeekFile)) {
for (int i = 0; i < limit; i++) {
int seekOff = r.nextInt(buf.length);
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
@ -336,13 +352,232 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
sb.append("Sequence of actions:\n");
for (int j = 0; j < seeks.length; j++) {
sb.append("seek @ ").append(seeks[j]).append(" ")
.append("read ").append(reads[j]).append("\n");
.append("read ").append(reads[j]).append("\n");
}
LOG.error(sb.toString());
throw afe;
} finally {
stm.close();
}
}
@Test
public void testReadFullyZeroByteFile() throws Throwable {
describe("readFully against a 0 byte file");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(zeroByteFile);
assertEquals(0, instream.getPos());
byte[] buffer = new byte[1];
instream.readFully(0, buffer, 0, 0);
assertEquals(0, instream.getPos());
// seek to 0 read 0 bytes from it
instream.seek(0);
assertEquals(0, instream.read(buffer, 0, 0));
}
@Test
public void testReadFullyPastEOFZeroByteFile() throws Throwable {
assumeSupportsPositionedReadable();
describe("readFully past the EOF of a 0 byte file");
instream = getFileSystem().open(zeroByteFile);
byte[] buffer = new byte[1];
// try to read past end of file
try {
instream.readFully(0, buffer, 0, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException
| EOFException e) {
// expected
}
}
@Test
public void testReadFullySmallFile() throws Throwable {
describe("readFully operations");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
// expect negative length to fail
try {
instream.readFully(0, buffer, 0, -16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative offset into buffer
try {
instream.readFully(0, buffer, -1, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// expect negative position to fail, ideally with EOF
try {
instream.readFully(-1, buffer);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException |IllegalArgumentException | IndexOutOfBoundsException e) {
handleRelaxedException("readFully with a negative position ",
"EOFException",
e);
}
// read more than the offset allows
try {
instream.readFully(0, buffer, buffer.length - 8, 16);
fail("Expected an exception");
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// read properly
assertEquals(0, instream.getPos());
instream.readFully(0, buffer);
assertEquals(0, instream.getPos());
// now read the entire file in one go
byte[] fullFile = new byte[TEST_FILE_LEN];
instream.readFully(0, fullFile);
assertEquals(0, instream.getPos());
try {
instream.readFully(16, fullFile);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully which reads past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadFullyPastEOF() throws Throwable {
describe("readFully past the EOF of a file");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
// now read past the end of the file
try {
instream.readFully(TEST_FILE_LEN + 1, buffer);
fail("Expected an exception");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully with an offset past EOF ",
"EOFException",
e);
}
// read zero bytes from an offset past EOF.
try {
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
// a zero byte read may fail-fast
LOG.info("Filesystem short-circuits 0-byte reads");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadFullyZeroBytebufferPastEOF() throws Throwable {
describe("readFully zero bytes from an offset past EOF");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
try {
instream.readFully(TEST_FILE_LEN + 1, buffer, 0, 0);
// a zero byte read may fail-fast
LOG.info("Filesystem short-circuits 0-byte reads");
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException e) {
handleRelaxedException("readFully(0 bytes) with an offset past EOF ",
"EOFException",
e);
}
}
@Test
public void testReadNullBuffer() throws Throwable {
describe("try to read a null buffer ");
assumeSupportsPositionedReadable();
try (FSDataInputStream in = getFileSystem().open(smallSeekFile)) {
// Null buffer
int r = in.read(0, null, 0, 16);
fail("Expected an exception from a read into a null buffer, got " + r);
} catch (IllegalArgumentException e) {
// expected
}
}
@Test
public void testReadSmallFile() throws Throwable {
describe("PositionedRead.read operations");
assumeSupportsPositionedReadable();
instream = getFileSystem().open(smallSeekFile);
byte[] buffer = new byte[256];
int r;
// expect negative length to fail
try {
r = instream.read(0, buffer, 0, -16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative offset into buffer
try {
r = instream.read(0, buffer, -1, 16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// negative position
try {
r = instream.read(-1, buffer, 0, 16);
fail("Expected an exception, got " + r);
} catch (EOFException e) {
handleExpectedException(e);
} catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
handleRelaxedException("read() with a negative position ",
"EOFException",
e);
}
// read more than the offset allows
try {
r = instream.read(0, buffer, buffer.length - 8, 16);
fail("Expected an exception, got " + r);
} catch (IllegalArgumentException | IndexOutOfBoundsException e) {
// expected
}
// read properly
assertEquals(0, instream.getPos());
instream.readFully(0, buffer);
assertEquals(0, instream.getPos());
// now read the entire file in one go
byte[] fullFile = new byte[TEST_FILE_LEN];
assertEquals(TEST_FILE_LEN,
instream.read(0, fullFile, 0, fullFile.length));
assertEquals(0, instream.getPos());
// now read past the end of the file
assertEquals(-1,
instream.read(TEST_FILE_LEN + 16, buffer, 0, 1));
}
@Test
public void testReadAtExactEOF() throws Throwable {
describe("read at the end of the file");
instream = getFileSystem().open(smallSeekFile);
instream.seek(TEST_FILE_LEN -1);
assertTrue("read at last byte", instream.read() > 0);
assertEquals("read just past EOF", -1, instream.read());
}
}

View File

@ -57,7 +57,7 @@ public abstract class AbstractFSContractTestBase extends Assert
public static final int DEFAULT_TEST_TIMEOUT = 180 * 1000;
/**
* The FS contract used for these tets
* The FS contract used for these tests
*/
private AbstractFSContract contract;

View File

@ -53,20 +53,20 @@ public interface ContractOptions {
/**
* Flag to indicate that the FS can rename into directories that
* don't exist, creating them as needed.
* @{value}
* {@value}
*/
String RENAME_CREATES_DEST_DIRS = "rename-creates-dest-dirs";
/**
* Flag to indicate that the FS does not follow the rename contract -and
* instead only returns false on a failure.
* @{value}
* {@value}
*/
String RENAME_OVERWRITES_DEST = "rename-overwrites-dest";
/**
* Flag to indicate that the FS returns false if the destination exists
* @{value}
* {@value}
*/
String RENAME_RETURNS_FALSE_IF_DEST_EXISTS =
"rename-returns-false-if-dest-exists";
@ -74,7 +74,7 @@ public interface ContractOptions {
/**
* Flag to indicate that the FS returns false on a rename
* if the source is missing
* @{value}
* {@value}
*/
String RENAME_RETURNS_FALSE_IF_SOURCE_MISSING =
"rename-returns-false-if-source-missing";
@ -82,74 +82,74 @@ public interface ContractOptions {
/**
* Flag to indicate that the FS remove dest first if it is an empty directory
* mean the FS honors POSIX rename behavior.
* @{value}
* {@value}
*/
String RENAME_REMOVE_DEST_IF_EMPTY_DIR = "rename-remove-dest-if-empty-dir";
/**
* Flag to indicate that append is supported
* @{value}
* {@value}
*/
String SUPPORTS_APPEND = "supports-append";
/**
* Flag to indicate that setTimes is supported.
* @{value}
* {@value}
*/
String SUPPORTS_SETTIMES = "supports-settimes";
/**
* Flag to indicate that getFileStatus is supported.
* @{value}
* {@value}
*/
String SUPPORTS_GETFILESTATUS = "supports-getfilestatus";
/**
* Flag to indicate that renames are atomic
* @{value}
* {@value}
*/
String SUPPORTS_ATOMIC_RENAME = "supports-atomic-rename";
/**
* Flag to indicate that directory deletes are atomic
* @{value}
* {@value}
*/
String SUPPORTS_ATOMIC_DIRECTORY_DELETE = "supports-atomic-directory-delete";
/**
* Does the FS support multiple block locations?
* @{value}
* {@value}
*/
String SUPPORTS_BLOCK_LOCALITY = "supports-block-locality";
/**
* Does the FS support the concat() operation?
* @{value}
* {@value}
*/
String SUPPORTS_CONCAT = "supports-concat";
/**
* Is seeking supported at all?
* @{value}
* {@value}
*/
String SUPPORTS_SEEK = "supports-seek";
/**
* Is seeking past the EOF allowed?
* @{value}
* {@value}
*/
String REJECTS_SEEK_PAST_EOF = "rejects-seek-past-eof";
/**
* Is seeking on a closed file supported? Some filesystems only raise an
* exception later, when trying to read.
* @{value}
* {@value}
*/
String SUPPORTS_SEEK_ON_CLOSED_FILE = "supports-seek-on-closed-file";
/**
* Is available() on a closed InputStream supported?
* @{value}
* {@value}
*/
String SUPPORTS_AVAILABLE_ON_CLOSED_FILE = "supports-available-on-closed-file";
@ -157,32 +157,39 @@ public interface ContractOptions {
* Flag to indicate that this FS expects to throw the strictest
* exceptions it can, not generic IOEs, which, if returned,
* must be rejected.
* @{value}
* {@value}
*/
String SUPPORTS_STRICT_EXCEPTIONS = "supports-strict-exceptions";
/**
* Are unix permissions
* @{value}
* {@value}
*/
String SUPPORTS_UNIX_PERMISSIONS = "supports-unix-permissions";
/**
* Is positioned readable supported? Supporting seek should be sufficient
* for this.
* {@value}
*/
String SUPPORTS_POSITIONED_READABLE = "supports-positioned-readable";
/**
* Maximum path length
* @{value}
* {@value}
*/
String MAX_PATH_ = "max-path";
/**
* Maximum filesize: 0 or -1 for no limit
* @{value}
* {@value}
*/
String MAX_FILESIZE = "max-filesize";
/**
* Flag to indicate that tests on the root directories of a filesystem/
* object store are permitted
* @{value}
* {@value}
*/
String TEST_ROOT_TESTS_ENABLED = "test.root-tests-enabled";

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.internal.AssumptionViolatedException;
import org.slf4j.Logger;
@ -432,9 +433,7 @@ public class ContractTestUtils extends Assert {
* @throws AssertionError with the text and throwable -always
*/
public static void fail(String text, Throwable thrown) {
AssertionError e = new AssertionError(text);
e.initCause(thrown);
throw e;
throw new AssertionError(text, thrown);
}
/**
@ -509,10 +508,14 @@ public class ContractTestUtils extends Assert {
boolean overwrite,
byte[] data) throws IOException {
FSDataOutputStream stream = fs.create(path, overwrite);
if (data != null && data.length > 0) {
stream.write(data);
try {
if (data != null && data.length > 0) {
stream.write(data);
}
stream.close();
} finally {
IOUtils.closeStream(stream);
}
stream.close();
}
/**
@ -574,13 +577,10 @@ public class ContractTestUtils extends Assert {
public static String readBytesToString(FileSystem fs,
Path path,
int length) throws IOException {
FSDataInputStream in = fs.open(path);
try {
try (FSDataInputStream in = fs.open(path)) {
byte[] buf = new byte[length];
in.readFully(0, buf);
return toChar(buf);
} finally {
in.close();
}
}
@ -786,8 +786,7 @@ public class ContractTestUtils extends Assert {
long totalBytesRead = 0;
int nextExpectedNumber = 0;
final InputStream inputStream = fs.open(path);
try {
try (InputStream inputStream = fs.open(path)) {
while (true) {
final int bytesRead = inputStream.read(testBuffer);
if (bytesRead < 0) {
@ -814,8 +813,6 @@ public class ContractTestUtils extends Assert {
throw new IOException("Expected to read " + expectedSize +
" bytes but only received " + totalBytesRead);
}
} finally {
inputStream.close();
}
}

View File

@ -100,7 +100,7 @@ case sensitivity and permission options are determined at run time from OS type
<value>true</value>
</property>
<!-- checksum FS doesn't allow seeing past EOF -->
<!-- checksum FS doesn't allow seeking past EOF -->
<property>
<name>fs.contract.rejects-seek-past-eof</name>
<value>true</value>
@ -121,4 +121,4 @@ case sensitivity and permission options are determined at run time from OS type
<value>true</value>
</property>
</configuration>
</configuration>

View File

@ -1423,6 +1423,10 @@ public class DFSInputStream extends FSInputStream
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
try (TraceScope scope = dfsClient.
newReaderTraceScope("DFSInputStream#byteArrayPread",
src, position, length)) {

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.StringTokenizer;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import com.google.common.annotations.VisibleForTesting;
@ -128,6 +129,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
@VisibleForTesting
protected InputStreamAndFileLength openInputStream(long startOffset)
throws IOException {
if (startOffset < 0) {
throw new EOFException("Negative Position");
}
// Use the original url if no resolved url exists, eg. if
// it's the first time a request is made.
final boolean resolved = resolvedURL.getURL() != null;
@ -224,6 +228,10 @@ public abstract class ByteRangeInputStream extends FSInputStream {
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return 0;
}
try (InputStream in = openInputStream(position).in) {
return in.read(buffer, offset, length);
}
@ -232,17 +240,21 @@ public abstract class ByteRangeInputStream extends FSInputStream {
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
final InputStreamAndFileLength fin = openInputStream(position);
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return;
}
final InputStreamAndFileLength fin = openInputStream(position);
try {
if (fin.length != null && length + position > fin.length) {
throw new EOFException("The length to read " + length
+ " exceeds the file length " + fin.length);
}
int nread = 0;
while (nread < length) {
int nbytes = fin.in.read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}

View File

@ -137,6 +137,8 @@ public class S3AInputStream extends FSInputStream {
LOG.info("Got socket exception while trying to read from stream, trying to recover " + e);
reopen(pos);
byteRead = wrappedStream.read();
} catch (EOFException e) {
return -1;
}
if (byteRead >= 0) {
@ -216,4 +218,42 @@ public class S3AInputStream extends FSInputStream {
public boolean markSupported() {
return false;
}
/**
* Subclass {@code readFully()} operation which only seeks at the start
* of the series of operations; seeking back at the end.
*
* This is significantly higher performance if multiple read attempts are
* needed to fetch the data, as it does not break the HTTP connection.
*
* To maintain thread safety requirements, this operation is synchronized
* for the duration of the sequence.
* {@inheritDoc}
*
*/
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
validatePositionedReadArgs(position, buffer, offset, length);
if (length == 0) {
return;
}
int nread = 0;
synchronized (this) {
long oldPos = getPos();
try {
seek(position);
while (nread < length) {
int nbytes = read(buffer, offset + nread, length - nread);
if (nbytes < 0) {
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
}
nread += nbytes;
}
} finally {
seek(oldPos);
}
}
}
}

View File

@ -725,6 +725,8 @@ public class NativeAzureFileSystem extends FileSystem {
// Return to the caller with the result.
//
return result;
} catch(EOFException e) {
return -1;
} catch(IOException e) {
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(e);
@ -773,7 +775,7 @@ public class NativeAzureFileSystem extends FileSystem {
pos += result;
}
if (null != statistics) {
if (null != statistics && result > 0) {
statistics.incrementBytesRead(result);
}