diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index a2273bf8334..2ff2be10e2b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; @@ -62,9 +63,10 @@ import org.apache.hadoop.util.StringUtils; @InterfaceAudience.Private @InterfaceStability.Evolving public class CryptoInputStream extends FilterInputStream implements - Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, - CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, - ReadableByteChannel, CanUnbuffer, StreamCapabilities { + Seekable, PositionedReadable, ByteBufferReadable, + ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind, + CanSetReadahead, HasEnhancedByteBufferAccess, ReadableByteChannel, + CanUnbuffer, StreamCapabilities { private final byte[] oneByteBuf = new byte[1]; private final CryptoCodec codec; private final Decryptor decryptor; @@ -341,6 +343,24 @@ public class CryptoInputStream extends FilterInputStream implements "positioned read."); } } + + /** Positioned read using ByteBuffers. It is thread-safe */ + @Override + public int read(long position, final ByteBuffer buf) throws IOException { + checkStream(); + try { + int pos = buf.position(); + final int n = ((ByteBufferPositionedReadable) in).read(position, buf); + if (n > 0) { + // This operation does not change the current offset of the file + decrypt(position, buf, n, pos); + } + return n; + } catch (ClassCastException e) { + throw new UnsupportedOperationException( + "This stream does not support " + "positioned read."); + } + } /** * Decrypt length bytes in buffer starting at offset. Output is also put @@ -375,7 +395,80 @@ public class CryptoInputStream extends FilterInputStream implements returnDecryptor(decryptor); } } - + + /** + * Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are + * decrypted from {@code buf} starting at {@code start}. + * {@code buf.position()} and {@code buf.limit()} are unchanged after this + * method returns. This method is thread-safe. + * + *

+ * This method decrypts the input buf chunk-by-chunk and writes the decrypted + * output back into the input buf. It uses two local buffers taken from the + * {@link #bufferPool} to assist in this process: one is designated as the + * input buffer and it stores a single chunk of the given buf, the other is + * designated as the output buffer, which stores the output of decrypting the + * input buffer. Both buffers are of size {@link #bufferSize}. + *

+ * + *

+ * Decryption is done by using a {@link Decryptor} and the + * {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once the + * decrypted data is written into the output buffer, is is copied back into + * buf. Both buffers are returned back into the pool once the entire buf is + * decrypted. + *

+ * + * @param filePosition the current position of the file being read + * @param buf the {@link ByteBuffer} to decrypt + * @param length the number of bytes in {@code buf} to decrypt + * @param start the position in {@code buf} to start decrypting data from + */ + private void decrypt(long filePosition, ByteBuffer buf, int length, int start) + throws IOException { + ByteBuffer localInBuffer = null; + ByteBuffer localOutBuffer = null; + + // Duplicate the buffer so we don't have to worry about resetting the + // original position and limit at the end of the method + buf = buf.duplicate(); + + int decryptedBytes = 0; + Decryptor localDecryptor = null; + try { + localInBuffer = getBuffer(); + localOutBuffer = getBuffer(); + localDecryptor = getDecryptor(); + byte[] localIV = initIV.clone(); + updateDecryptor(localDecryptor, filePosition, localIV); + byte localPadding = getPadding(filePosition); + // Set proper filePosition for inputdata. + localInBuffer.position(localPadding); + + while (decryptedBytes < length) { + buf.position(start + decryptedBytes); + buf.limit(start + decryptedBytes + + Math.min(length - decryptedBytes, localInBuffer.remaining())); + localInBuffer.put(buf); + // Do decryption + try { + decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding); + buf.position(start + decryptedBytes); + buf.limit(start + length); + decryptedBytes += localOutBuffer.remaining(); + buf.put(localOutBuffer); + } finally { + localPadding = afterDecryption(localDecryptor, localInBuffer, + filePosition + length, localIV); + } + } + } finally { + returnBuffer(localInBuffer); + returnBuffer(localOutBuffer); + returnDecryptor(localDecryptor); + } + } + /** Positioned read fully. It is thread-safe */ @Override public void readFully(long position, byte[] buffer, int offset, int length) @@ -740,6 +833,8 @@ public class CryptoInputStream extends FilterInputStream implements case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java new file mode 100644 index 00000000000..3506870ac2a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Implementers of this interface provide a positioned read API that writes to a + * {@link ByteBuffer} rather than a {@code byte[]}. + * + * @see PositionedReadable + * @see ByteBufferReadable + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ByteBufferPositionedReadable { + /** + * Reads up to {@code buf.remaining()} bytes into buf from a given position in + * the file and returns the number of bytes read. Callers should use + * {@code buf.limit(...)} to control the size of the desired read and + * {@code buf.position(...)} to control the offset into the buffer the data + * should be written to. + *

+ * After a successful call, {@code buf.position()} will be advanced by the + * number of bytes read and {@code buf.limit()} will be unchanged. + *

+ * In the case of an exception, the state of the buffer (the contents of the + * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is + * undefined, and callers should be prepared to recover from this eventuality. + *

+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with + * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying + * stream supports this interface, otherwise they might get a + * {@link UnsupportedOperationException}. + *

+ * Implementations should treat 0-length requests as legitimate, and must not + * signal an error upon their receipt. + * + * @param position position within file + * @param buf the ByteBuffer to receive the results of the read operation. + * @return the number of bytes read, possibly zero, or -1 if reached + * end-of-stream + * @throws IOException if there is some error performing the read + */ + int read(long position, ByteBuffer buf) throws IOException; +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 08d71f16c07..4868479f0eb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -38,7 +38,8 @@ import org.apache.hadoop.util.IdentityHashStore; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities, + ByteBufferPositionedReadable { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -246,4 +247,13 @@ public class FSDataInputStream extends DataInputStream public String toString() { return super.toString() + ": " + in; } + + @Override + public int read(long position, ByteBuffer buf) throws IOException { + if (in instanceof ByteBufferPositionedReadable) { + return ((ByteBufferPositionedReadable)in).read(position, buf); + } + throw new UnsupportedOperationException("Byte-buffer pread unsupported " + + "by input stream"); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 3549cdc4fa3..e26acd4dddf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -59,6 +59,17 @@ public interface StreamCapabilities { */ String UNBUFFER = "in:unbuffer"; + /** + * Stream read(ByteBuffer) capability implemented by + * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}. + */ + String READBYTEBUFFER = "in:readbytebuffer"; + + /** + * Stream read(long, ByteBuffer) capability implemented by + * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}. + */ + String PREADBYTEBUFFER = "in:preadbytebuffer"; /** * Capabilities that a stream can support and be queried for. */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java index a0eb1058338..502abfc7602 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -26,6 +26,7 @@ import java.nio.ByteOrder; import java.util.EnumSet; import java.util.Random; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataOutputStream; @@ -129,6 +130,32 @@ public abstract class CryptoStreamsTestBase { Assert.assertArrayEquals(result, expectedData); } + private int byteBufferPreadAll(ByteBufferPositionedReadable in, + ByteBuffer buf) throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (!buf.hasRemaining()) { + break; + } + n = in.read(total, buf); + } + + return total; + } + + private void byteBufferPreadCheck(ByteBufferPositionedReadable in) + throws Exception { + ByteBuffer result = ByteBuffer.allocate(dataLen); + int n = byteBufferPreadAll(in, result); + + Assert.assertEquals(dataLen, n); + ByteBuffer expectedData = ByteBuffer.allocate(n); + expectedData.put(data, 0, n); + Assert.assertArrayEquals(result.array(), expectedData.array()); + } + protected OutputStream getOutputStream(int bufferSize) throws IOException { return getOutputStream(bufferSize, key, iv); } @@ -288,20 +315,36 @@ public abstract class CryptoStreamsTestBase { return total; } + + private int readAll(InputStream in, long pos, ByteBuffer buf) + throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (!buf.hasRemaining()) { + break; + } + n = ((ByteBufferPositionedReadable) in).read(pos + total, buf); + } + + return total; + } /** Test positioned read. */ @Test(timeout=120000) public void testPositionedRead() throws Exception { - OutputStream out = getOutputStream(defaultBufferSize); - writeData(out); + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } - InputStream in = getInputStream(defaultBufferSize); - // Pos: 1/3 dataLen - positionedReadCheck(in , dataLen / 3); + try (InputStream in = getInputStream(defaultBufferSize)) { + // Pos: 1/3 dataLen + positionedReadCheck(in, dataLen / 3); - // Pos: 1/2 dataLen - positionedReadCheck(in, dataLen / 2); - in.close(); + // Pos: 1/2 dataLen + positionedReadCheck(in, dataLen / 2); + } } private void positionedReadCheck(InputStream in, int pos) throws Exception { @@ -315,6 +358,35 @@ public abstract class CryptoStreamsTestBase { System.arraycopy(data, pos, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); } + + /** Test positioned read with ByteBuffers. */ + @Test(timeout=120000) + public void testPositionedReadWithByteBuffer() throws Exception { + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } + + try (InputStream in = getInputStream(defaultBufferSize)) { + // Pos: 1/3 dataLen + positionedReadCheckWithByteBuffer(in, dataLen / 3); + + // Pos: 1/2 dataLen + positionedReadCheckWithByteBuffer(in, dataLen / 2); + } + } + + private void positionedReadCheckWithByteBuffer(InputStream in, int pos) + throws Exception { + ByteBuffer result = ByteBuffer.allocate(dataLen); + int n = readAll(in, pos, result); + + Assert.assertEquals(dataLen, n + pos); + byte[] readData = new byte[n]; + System.arraycopy(result.array(), 0, readData, 0, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, pos, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } /** Test read fully */ @Test(timeout=120000) @@ -505,12 +577,40 @@ public abstract class CryptoStreamsTestBase { System.arraycopy(data, 0, expectedData, 0, n); Assert.assertArrayEquals(readData, expectedData); } + + private void byteBufferPreadCheck(InputStream in, ByteBuffer buf, + int bufPos) throws Exception { + // Test reading from position 0 + buf.position(bufPos); + int n = ((ByteBufferPositionedReadable) in).read(0, buf); + Assert.assertEquals(bufPos + n, buf.position()); + byte[] readData = new byte[n]; + buf.rewind(); + buf.position(bufPos); + buf.get(readData); + byte[] expectedData = new byte[n]; + System.arraycopy(data, 0, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + + // Test reading from half way through the data + buf.position(bufPos); + n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf); + Assert.assertEquals(bufPos + n, buf.position()); + readData = new byte[n]; + buf.rewind(); + buf.position(bufPos); + buf.get(readData); + expectedData = new byte[n]; + System.arraycopy(data, dataLen / 2, expectedData, 0, n); + Assert.assertArrayEquals(readData, expectedData); + } /** Test byte buffer read with different buffer size. */ @Test(timeout=120000) public void testByteBufferRead() throws Exception { - OutputStream out = getOutputStream(defaultBufferSize); - writeData(out); + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } // Default buffer size, initial buffer position is 0 InputStream in = getInputStream(defaultBufferSize); @@ -560,6 +660,53 @@ public abstract class CryptoStreamsTestBase { byteBufferReadCheck(in, buf, 11); in.close(); } + + /** Test byte buffer pread with different buffer size. */ + @Test(timeout=120000) + public void testByteBufferPread() throws Exception { + try (OutputStream out = getOutputStream(defaultBufferSize)) { + writeData(out); + } + + try (InputStream defaultBuf = getInputStream(defaultBufferSize); + InputStream smallBuf = getInputStream(smallBufferSize)) { + + ByteBuffer buf = ByteBuffer.allocate(dataLen + 100); + + // Default buffer size, initial buffer position is 0 + byteBufferPreadCheck(defaultBuf, buf, 0); + + // Default buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(defaultBuf, buf, 11); + + // Small buffer size, initial buffer position is 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 0); + + // Small buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 11); + + // Test with direct ByteBuffer + buf = ByteBuffer.allocateDirect(dataLen + 100); + + // Direct buffer, default buffer size, initial buffer position is 0 + byteBufferPreadCheck(defaultBuf, buf, 0); + + // Direct buffer, default buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(defaultBuf, buf, 11); + + // Direct buffer, small buffer size, initial buffer position is 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 0); + + // Direct buffer, small buffer size, initial buffer position is not 0 + buf.clear(); + byteBufferPreadCheck(smallBuf, buf, 11); + } + } @Test(timeout=120000) public void testCombinedOp() throws Exception { @@ -782,15 +929,15 @@ public abstract class CryptoStreamsTestBase { // Test pread try (InputStream in = getInputStream(smallBufferSize)) { - if (in instanceof PositionedReadable) { - PositionedReadable pin = (PositionedReadable) in; + if (in instanceof ByteBufferPositionedReadable) { + ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in; // Test unbuffer after pread - preadCheck(pin); + byteBufferPreadCheck(bbpin); ((CanUnbuffer) in).unbuffer(); // Test pread again after unbuffer - preadCheck(pin); + byteBufferPreadCheck(bbpin); // Test close after unbuffer ((CanUnbuffer) in).unbuffer(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java index 28bacc6142e..d524fcfa6a0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; @@ -184,9 +185,9 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { static class FakeInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable, - HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer, - StreamCapabilities { + ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind, + CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, + StreamCapabilities { private final byte[] oneByteBuf = new byte[1]; private int pos = 0; private final byte[] data; @@ -309,6 +310,31 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { return -1; } + @Override + public int read(long position, ByteBuffer buf) throws IOException { + if (buf == null) { + throw new NullPointerException(); + } else if (!buf.hasRemaining()) { + return 0; + } + + if (position > length) { + throw new IOException("Cannot read after EOF."); + } + if (position < 0) { + throw new IOException("Cannot read to negative offset."); + } + + checkStream(); + + if (position < length) { + int n = (int) Math.min(buf.remaining(), length - position); + buf.put(data, (int) position, n); + return n; + } + return -1; + } + @Override public void readFully(long position, byte[] b, int off, int len) throws IOException { @@ -384,6 +410,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; @@ -438,6 +466,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND)); assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD)); assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER)); + assertTrue(cis.hasCapability(StreamCapabilities.READBYTEBUFFER)); + assertTrue(cis.hasCapability(StreamCapabilities.PREADBYTEBUFFER)); assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH)); assertFalse(cis.hasCapability(StreamCapabilities.HSYNC)); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java index bb3fd7a68d7..e7d922e78a6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java @@ -90,11 +90,21 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase { @Override @Test(timeout=10000) public void testByteBufferRead() throws Exception {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testPositionedReadWithByteBuffer() throws IOException {} @Ignore("ChecksumFSOutputSummer doesn't support Syncable") @Override @Test(timeout=10000) public void testSyncable() throws IOException {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferPread() throws IOException {} @Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read") @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java index 7e300777a37..036706f435a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java @@ -91,6 +91,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase { @Test(timeout=10000) public void testPositionedRead() throws IOException {} + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testPositionedReadWithByteBuffer() throws IOException {} + @Ignore("Wrapped stream doesn't support ReadFully") @Override @Test(timeout=10000) @@ -105,6 +110,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase { @Override @Test(timeout=10000) public void testByteBufferRead() throws IOException {} + + @Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable") + @Override + @Test(timeout=10000) + public void testByteBufferPread() throws IOException {} @Ignore("Wrapped stream doesn't support ByteBufferRead, Seek") @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index e7b68492d51..51f21f54fd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.fs.CanSetDropBehind; @@ -100,7 +101,8 @@ import javax.annotation.Nonnull; @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities, + ByteBufferPositionedReadable { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1761,6 +1763,14 @@ public class DFSInputStream extends FSInputStream throw new IOException("Mark/reset not supported"); } + @Override + public int read(long position, final ByteBuffer buf) throws IOException { + if (!buf.hasRemaining()) { + return 0; + } + return pread(position, buf); + } + /** Utility class to encapsulate data node info and its address. */ static final class DNAddrPair { final DatanodeInfo info; @@ -1983,6 +1993,8 @@ public class DFSInputStream extends FSInputStream case StreamCapabilities.READAHEAD: case StreamCapabilities.DROPBEHIND: case StreamCapabilities.UNBUFFER: + case StreamCapabilities.READBYTEBUFFER: + case StreamCapabilities.PREADBYTEBUFFER: return true; default: return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h index 0eab9a68aea..f00326317f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/hdfs_test.h @@ -49,6 +49,24 @@ extern "C" { */ void hdfsFileDisableDirectRead(struct hdfsFile_internal *file); + /** + * Determine if a file is using the "direct pread" optimization. + * + * @param file The HDFS file + * @return 1 if the file is using the direct pread optimization, + * 0 otherwise. + */ + int hdfsFileUsesDirectPread(struct hdfsFile_internal *file); + + /** + * Disable the direct pread optimization for a file. + * + * This is mainly provided for unit testing purposes. + * + * @param file The HDFS file + */ + void hdfsFileDisableDirectPread(struct hdfsFile_internal *file); + /** * Disable domain socket security checks. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c index d69aa377948..26475637e24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_ops.c @@ -75,9 +75,9 @@ int main(int argc, char **argv) { const char *userPath = "/tmp/usertestfile.txt"; char buffer[32], buffer2[256], rdbuffer[32]; - tSize num_written_bytes, num_read_bytes; + tSize num_written_bytes, num_read_bytes, num_pread_bytes; hdfsFS fs, lfs; - hdfsFile writeFile, readFile, localFile, appendFile, userFile; + hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile; tOffset currentPos, seekPos; int exists, totalResult, result, numEntries, i, j; const char *resp; @@ -206,6 +206,7 @@ int main(int argc, char **argv) { } fprintf(stderr, "Read (direct) following %d bytes:\n%s\n", num_read_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); if (hdfsSeek(fs, readFile, 0L)) { fprintf(stderr, "Failed to seek to file start!\n"); exit(-1); @@ -215,18 +216,17 @@ int main(int argc, char **argv) { // read path hdfsFileDisableDirectRead(readFile); - num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, + num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to read. Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Read following %d bytes:\n%s\n", num_read_bytes, buffer); - memset(buffer, 0, strlen(fileContents + 1)); - num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, - sizeof(buffer)); - fprintf(stderr, "Read following %d bytes:\n%s\n", - num_read_bytes, buffer); - hdfsCloseFile(fs, readFile); // Test correct behaviour for unsupported filesystems @@ -251,6 +251,104 @@ int main(int argc, char **argv) { hdfsCloseFile(lfs, localFile); } + { + // Pread tests + + exists = hdfsExists(fs, readPath); + + if (exists) { + fprintf(stderr, "Failed to validate existence of %s\n", readPath); + exit(-1); + } + + preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0); + if (!preadFile) { + fprintf(stderr, "Failed to open %s for reading!\n", readPath); + exit(-1); + } + + if (!hdfsFileIsOpenForRead(preadFile)) { + fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file " + "with O_RDONLY, and it did not show up as 'open for " + "read'\n"); + exit(-1); + } + + fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile)); + + num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", + num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + exit(-1); + } + + // Test pread midway through the file rather than at the beginning + const char *fileContentsChunk = "World!"; + num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContentsChunk, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + exit(-1); + } + + // Disable the direct pread path so that we really go through the slow + // read path + hdfsFileDisableDirectPread(preadFile); + + num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) { + fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n", + fileContents, buffer, num_pread_bytes); + exit(-1); + } + fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + exit(-1); + } + + num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer)); + if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) { + fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n", + fileContentsChunk, buffer, num_read_bytes); + exit(-1); + } + fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer); + memset(buffer, 0, strlen(fileContents + 1)); + if (hdfsTell(fs, preadFile) != 0) { + fprintf(stderr, "Pread changed position of file\n"); + exit(-1); + } + + hdfsCloseFile(fs, preadFile); + + // Test correct behaviour for unsupported filesystems + localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0); + + if (hdfsFileUsesDirectPread(localFile)) { + fprintf(stderr, "Direct pread support incorrectly detected for local " + "filesystem\n"); + exit(-1); + } + + hdfsCloseFile(lfs, localFile); + } + totalResult = 0; result = 0; { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c index ac4bca6758d..e3cd5806edb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/hdfs.c @@ -38,6 +38,7 @@ #define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream" #define HADOOP_STAT "org/apache/hadoop/fs/FileStatus" #define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission" +#define HADOOP_FS_DATA_INPUT_STREAM "org/apache/hadoop/fs/FSDataInputStream" #define JAVA_NET_ISA "java/net/InetSocketAddress" #define JAVA_NET_URI "java/net/URI" #define JAVA_STRING "java/lang/String" @@ -56,8 +57,23 @@ // Bit fields for hdfsFile_internal flags #define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0) +#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<0) +/** + * Reads bytes using the read(ByteBuffer) API. By using Java + * DirectByteBuffers we can avoid copying the bytes from kernel space into + * user space. + */ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length); + +/** + * Reads bytes using the read(long, ByteBuffer) API. By using Java + * DirectByteBuffers we can avoid copying the bytes from kernel space into + * user space. + */ +tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, + tSize length); + static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo); /** @@ -235,6 +251,16 @@ void hdfsFileDisableDirectRead(hdfsFile file) file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; } +int hdfsFileUsesDirectPread(hdfsFile file) +{ + return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD); +} + +void hdfsFileDisableDirectPread(hdfsFile file) +{ + file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD; +} + int hdfsDisableDomainSocketSecurity(void) { jthrowable jthr; @@ -922,6 +948,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld, return 0; } +/** + * Delegates to FsDataInputStream#hasCapability(String). Used to check if a + * given input stream supports certain methods, such as + * ByteBufferReadable#read(ByteBuffer). + * + * @param jFile the FsDataInputStream to call hasCapability on + * @param capability the name of the capability to query; for a full list of + * possible values see StreamCapabilities + * + * @return true if the given jFile has the given capability, false otherwise + * + * @see org.apache.hadoop.fs.StreamCapabilities + */ +static int hdfsHasStreamCapability(jobject jFile, + const char *capability) { + int ret = 0; + jthrowable jthr = NULL; + jvalue jVal; + jstring jCapabilityString = NULL; + + /* Get the JNIEnv* corresponding to current thread */ + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return 0; + } + + jthr = newJavaStr(env, capability, &jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsHasStreamCapability(%s): newJavaStr", capability); + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, jFile, + HADOOP_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z", + jCapabilityString); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability", + capability); + goto done; + } + +done: + destroyLocalReference(env, jthr); + destroyLocalReference(env, jCapabilityString); + if (ret) { + errno = ret; + return 0; + } + if (jVal.z == JNI_TRUE) { + return 1; + } + return 0; +} + static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, int32_t bufferSize, int16_t replication, int64_t blockSize) { @@ -932,7 +1014,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, return f{is|os}; */ int accmode = flags & O_ACCMODE; - jstring jStrBufferSize = NULL, jStrReplication = NULL; + jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL; jobject jConfiguration = NULL, jPath = NULL, jFile = NULL; jobject jFS = (jobject)fs; jthrowable jthr; @@ -1090,16 +1172,16 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags, file->flags = 0; if ((flags & O_WRONLY) == 0) { - // Try a test read to see if we can do direct reads - char buf; - if (readDirect(fs, file, &buf, 0) == 0) { - // Success - 0-byte read should return 0 + // Check the StreamCapabilities of jFile to see if we can do direct + // reads + if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) { file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ; - } else if (errno != ENOTSUP) { - // Unexpected error. Clear it, don't set the direct flag. - fprintf(stderr, - "hdfsOpenFile(%s): WARN: Unexpected error %d when testing " - "for direct read compatibility\n", path, errno); + } + + // Check the StreamCapabilities of jFile to see if we can do direct + // preads + if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) { + file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD; } } ret = 0; @@ -1109,7 +1191,8 @@ done: destroyLocalReference(env, jStrReplication); destroyLocalReference(env, jConfiguration); destroyLocalReference(env, jPath); - destroyLocalReference(env, jFile); + destroyLocalReference(env, jFile); + destroyLocalReference(env, jCapabilityString); if (ret) { if (file) { if (file->file) { @@ -1311,6 +1394,12 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f, return 0; } +/** + * If the underlying stream supports the ByteBufferReadable interface then + * this method will transparently use read(ByteBuffer). This can help + * improve performance as it avoids unnecessary copies between the kernel + * space, the Java process space, and the C process space. + */ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) { jobject jInputStream; @@ -1423,6 +1512,12 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length) return (jVal.i < 0) ? 0 : jVal.i; } +/** + * If the underlying stream supports the ByteBufferPositionedReadable + * interface then this method will transparently use read(long, ByteBuffer). + * This can help improve performance as it avoids unnecessary copies between + * the kernel space, the Java process space, and the C process space. + */ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length) { @@ -1442,6 +1537,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, return -1; } + if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) { + return preadDirect(fs, f, position, buffer, length); + } + env = getJNIEnv(); if (env == NULL) { errno = EINTERNAL; @@ -1491,6 +1590,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, return jVal.i; } +tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, + tSize length) +{ + // JAVA EQUIVALENT: + // ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer + // fis.read(position, buf); + + jvalue jVal; + jthrowable jthr; + jobject bb; + + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + //Error checking... make sure that this file is 'readable' + if (f->type != HDFS_STREAM_INPUT) { + fprintf(stderr, "Cannot read from a non-InputStream object!\n"); + errno = EINVAL; + return -1; + } + + //Read the requisite bytes + bb = (*env)->NewDirectByteBuffer(env, buffer, length); + if (bb == NULL) { + errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "readDirect: NewDirectByteBuffer"); + return -1; + } + + jthr = invokeMethod(env, &jVal, INSTANCE, f->file, + HADOOP_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I", + position, bb); + destroyLocalReference(env, bb); + if (jthr) { + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "preadDirect: FSDataInputStream#read"); + return -1; + } + // Reached EOF, return 0 + if (jVal.i < 0) { + return 0; + } + // 0 bytes read, return error + if (jVal.i == 0) { + errno = EINTR; + return -1; + } + return jVal.i; +} + tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length) { // JAVA EQUIVALENT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java new file mode 100644 index 00000000000..4547db1c98e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteBufferPread.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * This class tests the DFS positional read functionality on a single node + * mini-cluster. These tests are inspired from {@link TestPread}. The tests + * are much less comprehensive than other pread tests because pread already + * internally uses {@link ByteBuffer}s. + */ +public class TestByteBufferPread { + + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static byte[] fileContents; + private static Path testFile; + private static Random rand; + + private static final long SEED = 0xDEADBEEFL; + private static final int BLOCK_SIZE = 4096; + private static final int FILE_SIZE = 12 * BLOCK_SIZE; + + @BeforeClass + public static void setup() throws IOException { + // Setup the cluster with a small block size so we can create small files + // that span multiple blocks + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + fs = cluster.getFileSystem(); + + // Create a test file that spans 12 blocks, and contains a bunch of random + // bytes + fileContents = new byte[FILE_SIZE]; + rand = new Random(SEED); + rand.nextBytes(fileContents); + testFile = new Path("/byte-buffer-pread-test.dat"); + try (FSDataOutputStream out = fs.create(testFile, (short) 3)) { + out.write(fileContents); + } + } + + /** + * Test preads with {@link java.nio.HeapByteBuffer}s. + */ + @Test + public void testPreadWithHeapByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); + } + + /** + * Test preads with {@link java.nio.DirectByteBuffer}s. + */ + @Test + public void testPreadWithDirectByteBuffer() throws IOException { + testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); + } + + /** + * Reads the entire testFile using the pread API and validates that its + * contents are properly loaded into the supplied {@link ByteBuffer}. + */ + private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + // Make sure the contents of the read buffer equal the contents of the + // file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, fileContents); + buffer.position(buffer.limit()); + } + } + + /** + * Attempts to read the testFile into a {@link ByteBuffer} that is already + * full, and validates that doing so does not change the contents of the + * supplied {@link ByteBuffer}. + */ + private void testPreadWithFullByteBuffer(ByteBuffer buffer) + throws IOException { + // Load some dummy data into the buffer + byte[] existingBufferBytes = new byte[FILE_SIZE]; + rand.nextBytes(existingBufferBytes); + buffer.put(existingBufferBytes); + // Make sure the buffer is full + assertFalse(buffer.hasRemaining()); + + try (FSDataInputStream in = fs.open(testFile)) { + // Attempt to read into the buffer, 0 bytes should be read since the + // buffer is full + assertEquals(0, in.read(buffer)); + + // Double check the buffer is still full and its contents have not + // changed + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, existingBufferBytes); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting a + * {@link ByteBuffer#limit} on the buffer. Validates that only half of the + * testFile is loaded into the buffer. + */ + private void testPreadWithLimitedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer limit to half the size of the file + buffer.limit(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we set the buffer limit to half the size of the file, we should + // have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by setting the + * {@link ByteBuffer#position} the half the size of the file. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPreadWithPositionedByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + // Set the buffer position to half the size of the file + buffer.position(FILE_SIZE / 2); + + try (FSDataInputStream in = fs.open(testFile)) { + while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position()); + } + + // Since we set the buffer position to half the size of the file, we + // should have only read half of the file into the buffer + assertEquals(totalBytesRead, FILE_SIZE / 2); + // Check that the buffer is full and the contents equal the first half of + // the file + assertFalse(buffer.hasRemaining()); + buffer.position(FILE_SIZE / 2); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); + } + } + + /** + * Reads half of the testFile into the {@link ByteBuffer} by specifying a + * position for the pread API that is half of the file size. Validates that + * only half of the testFile is loaded into the buffer. + */ + private void testPositionedPreadWithByteBuffer( + ByteBuffer buffer) throws IOException { + int bytesRead; + int totalBytesRead = 0; + + try (FSDataInputStream in = fs.open(testFile)) { + // Start reading from halfway through the file + while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2, + buffer)) > 0) { + totalBytesRead += bytesRead; + // Check that each call to read changes the position of the ByteBuffer + // correctly + assertEquals(totalBytesRead, buffer.position()); + } + + // Since we starting reading halfway through the file, the buffer should + // only be half full + assertEquals(totalBytesRead, FILE_SIZE / 2); + assertEquals(buffer.position(), FILE_SIZE / 2); + assertTrue(buffer.hasRemaining()); + // Check that the buffer contents equal the second half of the file + buffer.position(0); + byte[] bufferContents = new byte[FILE_SIZE / 2]; + buffer.get(bufferContents); + assertArrayEquals(bufferContents, + Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE)); + } + } + + @AfterClass + public static void shutdown() throws IOException { + try { + fs.delete(testFile, false); + fs.close(); + } finally { + cluster.shutdown(true); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java index 93486c07ddd..d39b9becc36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitLocalRead.java @@ -622,12 +622,11 @@ public class TestShortCircuitLocalRead { stm.write(fileData); stm.close(); try { - checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, - conf, shortCircuitFails); - //RemoteBlockReader have unsupported method read(ByteBuffer bf) - assertTrue( - "RemoteBlockReader unsupported method read(ByteBuffer bf) error", - checkUnsupportedMethod(fs, file1, fileData, readOffset)); + checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf, + shortCircuitFails); + assertFalse(true); + } catch (UnsupportedOperationException unex) { + // RemoteBlockReader have unsupported method read(ByteBuffer bf) } catch(IOException e) { throw new IOException( "doTestShortCircuitReadWithRemoteBlockReader ex error ", e); @@ -639,18 +638,4 @@ public class TestShortCircuitLocalRead { } } - private boolean checkUnsupportedMethod(FileSystem fs, Path file, - byte[] expected, int readOffset) throws IOException { - HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file); - ByteBuffer actual = - ByteBuffer.allocateDirect(expected.length - readOffset); - IOUtils.skipFully(stm, readOffset); - try { - stm.read(actual); - } catch(UnsupportedOperationException unex) { - return true; - } - return false; - } - }