HDFS-14483 Backport HDFS-3246,HDFS-14111 ByteBuffer pread interface to branch-2.9""
This is a revert of a revert, i.e. a reapplication of the HDFS-14483
patch. Original commit was missing the JIRA number.
Revert "Revert "Backport HDFS-3246,HDFS-14111 ByteBuffer pread interface to branch-2.9""
This reverts commit b26ad8a4c3
.
This commit is contained in:
parent
b26ad8a4c3
commit
a7c6d25dc3
|
@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
import org.apache.hadoop.fs.CanSetReadahead;
|
import org.apache.hadoop.fs.CanSetReadahead;
|
||||||
|
@ -62,9 +63,10 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class CryptoInputStream extends FilterInputStream implements
|
public class CryptoInputStream extends FilterInputStream implements
|
||||||
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
Seekable, PositionedReadable, ByteBufferReadable,
|
||||||
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
|
ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind,
|
||||||
ReadableByteChannel, CanUnbuffer, StreamCapabilities {
|
CanSetReadahead, HasEnhancedByteBufferAccess, ReadableByteChannel,
|
||||||
|
CanUnbuffer, StreamCapabilities {
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private final CryptoCodec codec;
|
private final CryptoCodec codec;
|
||||||
private final Decryptor decryptor;
|
private final Decryptor decryptor;
|
||||||
|
@ -342,6 +344,24 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Positioned read using ByteBuffers. It is thread-safe */
|
||||||
|
@Override
|
||||||
|
public int read(long position, final ByteBuffer buf) throws IOException {
|
||||||
|
checkStream();
|
||||||
|
try {
|
||||||
|
int pos = buf.position();
|
||||||
|
final int n = ((ByteBufferPositionedReadable) in).read(position, buf);
|
||||||
|
if (n > 0) {
|
||||||
|
// This operation does not change the current offset of the file
|
||||||
|
decrypt(position, buf, n, pos);
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"This stream does not support " + "positioned read.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decrypt length bytes in buffer starting at offset. Output is also put
|
* Decrypt length bytes in buffer starting at offset. Output is also put
|
||||||
* into buffer starting at offset. It is thread-safe.
|
* into buffer starting at offset. It is thread-safe.
|
||||||
|
@ -376,6 +396,79 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrypts the given {@link ByteBuffer} in place. {@code length} bytes are
|
||||||
|
* decrypted from {@code buf} starting at {@code start}.
|
||||||
|
* {@code buf.position()} and {@code buf.limit()} are unchanged after this
|
||||||
|
* method returns. This method is thread-safe.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* This method decrypts the input buf chunk-by-chunk and writes the decrypted
|
||||||
|
* output back into the input buf. It uses two local buffers taken from the
|
||||||
|
* {@link #bufferPool} to assist in this process: one is designated as the
|
||||||
|
* input buffer and it stores a single chunk of the given buf, the other is
|
||||||
|
* designated as the output buffer, which stores the output of decrypting the
|
||||||
|
* input buffer. Both buffers are of size {@link #bufferSize}.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Decryption is done by using a {@link Decryptor} and the
|
||||||
|
* {@link #decrypt(Decryptor, ByteBuffer, ByteBuffer, byte)} method. Once the
|
||||||
|
* decrypted data is written into the output buffer, is is copied back into
|
||||||
|
* buf. Both buffers are returned back into the pool once the entire buf is
|
||||||
|
* decrypted.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param filePosition the current position of the file being read
|
||||||
|
* @param buf the {@link ByteBuffer} to decrypt
|
||||||
|
* @param length the number of bytes in {@code buf} to decrypt
|
||||||
|
* @param start the position in {@code buf} to start decrypting data from
|
||||||
|
*/
|
||||||
|
private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
|
||||||
|
throws IOException {
|
||||||
|
ByteBuffer localInBuffer = null;
|
||||||
|
ByteBuffer localOutBuffer = null;
|
||||||
|
|
||||||
|
// Duplicate the buffer so we don't have to worry about resetting the
|
||||||
|
// original position and limit at the end of the method
|
||||||
|
buf = buf.duplicate();
|
||||||
|
|
||||||
|
int decryptedBytes = 0;
|
||||||
|
Decryptor localDecryptor = null;
|
||||||
|
try {
|
||||||
|
localInBuffer = getBuffer();
|
||||||
|
localOutBuffer = getBuffer();
|
||||||
|
localDecryptor = getDecryptor();
|
||||||
|
byte[] localIV = initIV.clone();
|
||||||
|
updateDecryptor(localDecryptor, filePosition, localIV);
|
||||||
|
byte localPadding = getPadding(filePosition);
|
||||||
|
// Set proper filePosition for inputdata.
|
||||||
|
localInBuffer.position(localPadding);
|
||||||
|
|
||||||
|
while (decryptedBytes < length) {
|
||||||
|
buf.position(start + decryptedBytes);
|
||||||
|
buf.limit(start + decryptedBytes
|
||||||
|
+ Math.min(length - decryptedBytes, localInBuffer.remaining()));
|
||||||
|
localInBuffer.put(buf);
|
||||||
|
// Do decryption
|
||||||
|
try {
|
||||||
|
decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
|
||||||
|
buf.position(start + decryptedBytes);
|
||||||
|
buf.limit(start + length);
|
||||||
|
decryptedBytes += localOutBuffer.remaining();
|
||||||
|
buf.put(localOutBuffer);
|
||||||
|
} finally {
|
||||||
|
localPadding = afterDecryption(localDecryptor, localInBuffer,
|
||||||
|
filePosition + length, localIV);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
returnBuffer(localInBuffer);
|
||||||
|
returnBuffer(localOutBuffer);
|
||||||
|
returnDecryptor(localDecryptor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Positioned read fully. It is thread-safe */
|
/** Positioned read fully. It is thread-safe */
|
||||||
@Override
|
@Override
|
||||||
public void readFully(long position, byte[] buffer, int offset, int length)
|
public void readFully(long position, byte[] buffer, int offset, int length)
|
||||||
|
@ -740,6 +833,8 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
case StreamCapabilities.READAHEAD:
|
case StreamCapabilities.READAHEAD:
|
||||||
case StreamCapabilities.DROPBEHIND:
|
case StreamCapabilities.DROPBEHIND:
|
||||||
case StreamCapabilities.UNBUFFER:
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
case StreamCapabilities.READBYTEBUFFER:
|
||||||
|
case StreamCapabilities.PREADBYTEBUFFER:
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementers of this interface provide a positioned read API that writes to a
|
||||||
|
* {@link ByteBuffer} rather than a {@code byte[]}.
|
||||||
|
*
|
||||||
|
* @see PositionedReadable
|
||||||
|
* @see ByteBufferReadable
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface ByteBufferPositionedReadable {
|
||||||
|
/**
|
||||||
|
* Reads up to {@code buf.remaining()} bytes into buf from a given position in
|
||||||
|
* the file and returns the number of bytes read. Callers should use
|
||||||
|
* {@code buf.limit(...)} to control the size of the desired read and
|
||||||
|
* {@code buf.position(...)} to control the offset into the buffer the data
|
||||||
|
* should be written to.
|
||||||
|
* <p>
|
||||||
|
* After a successful call, {@code buf.position()} will be advanced by the
|
||||||
|
* number of bytes read and {@code buf.limit()} will be unchanged.
|
||||||
|
* <p>
|
||||||
|
* In the case of an exception, the state of the buffer (the contents of the
|
||||||
|
* buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
|
||||||
|
* undefined, and callers should be prepared to recover from this eventuality.
|
||||||
|
* <p>
|
||||||
|
* Callers should use {@link StreamCapabilities#hasCapability(String)} with
|
||||||
|
* {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
|
||||||
|
* stream supports this interface, otherwise they might get a
|
||||||
|
* {@link UnsupportedOperationException}.
|
||||||
|
* <p>
|
||||||
|
* Implementations should treat 0-length requests as legitimate, and must not
|
||||||
|
* signal an error upon their receipt.
|
||||||
|
*
|
||||||
|
* @param position position within file
|
||||||
|
* @param buf the ByteBuffer to receive the results of the read operation.
|
||||||
|
* @return the number of bytes read, possibly zero, or -1 if reached
|
||||||
|
* end-of-stream
|
||||||
|
* @throws IOException if there is some error performing the read
|
||||||
|
*/
|
||||||
|
int read(long position, ByteBuffer buf) throws IOException;
|
||||||
|
}
|
|
@ -38,7 +38,8 @@ import org.apache.hadoop.util.IdentityHashStore;
|
||||||
public class FSDataInputStream extends DataInputStream
|
public class FSDataInputStream extends DataInputStream
|
||||||
implements Seekable, PositionedReadable,
|
implements Seekable, PositionedReadable,
|
||||||
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
|
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
|
||||||
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
|
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
|
||||||
|
ByteBufferPositionedReadable {
|
||||||
/**
|
/**
|
||||||
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
|
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
|
||||||
* objects
|
* objects
|
||||||
|
@ -246,4 +247,13 @@ public class FSDataInputStream extends DataInputStream
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return super.toString() + ": " + in;
|
return super.toString() + ": " + in;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(long position, ByteBuffer buf) throws IOException {
|
||||||
|
if (in instanceof ByteBufferPositionedReadable) {
|
||||||
|
return ((ByteBufferPositionedReadable)in).read(position, buf);
|
||||||
|
}
|
||||||
|
throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
|
||||||
|
"by input stream");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,6 +59,17 @@ public interface StreamCapabilities {
|
||||||
*/
|
*/
|
||||||
String UNBUFFER = "in:unbuffer";
|
String UNBUFFER = "in:unbuffer";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream read(ByteBuffer) capability implemented by
|
||||||
|
* {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
|
||||||
|
*/
|
||||||
|
String READBYTEBUFFER = "in:readbytebuffer";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream read(long, ByteBuffer) capability implemented by
|
||||||
|
* {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
|
||||||
|
*/
|
||||||
|
String PREADBYTEBUFFER = "in:preadbytebuffer";
|
||||||
/**
|
/**
|
||||||
* Capabilities that a stream can support and be queried for.
|
* Capabilities that a stream can support and be queried for.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.nio.ByteOrder;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.CanUnbuffer;
|
import org.apache.hadoop.fs.CanUnbuffer;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -129,6 +130,32 @@ public abstract class CryptoStreamsTestBase {
|
||||||
Assert.assertArrayEquals(result, expectedData);
|
Assert.assertArrayEquals(result, expectedData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int byteBufferPreadAll(ByteBufferPositionedReadable in,
|
||||||
|
ByteBuffer buf) throws IOException {
|
||||||
|
int n = 0;
|
||||||
|
int total = 0;
|
||||||
|
while (n != -1) {
|
||||||
|
total += n;
|
||||||
|
if (!buf.hasRemaining()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
n = in.read(total, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void byteBufferPreadCheck(ByteBufferPositionedReadable in)
|
||||||
|
throws Exception {
|
||||||
|
ByteBuffer result = ByteBuffer.allocate(dataLen);
|
||||||
|
int n = byteBufferPreadAll(in, result);
|
||||||
|
|
||||||
|
Assert.assertEquals(dataLen, n);
|
||||||
|
ByteBuffer expectedData = ByteBuffer.allocate(n);
|
||||||
|
expectedData.put(data, 0, n);
|
||||||
|
Assert.assertArrayEquals(result.array(), expectedData.array());
|
||||||
|
}
|
||||||
|
|
||||||
protected OutputStream getOutputStream(int bufferSize) throws IOException {
|
protected OutputStream getOutputStream(int bufferSize) throws IOException {
|
||||||
return getOutputStream(bufferSize, key, iv);
|
return getOutputStream(bufferSize, key, iv);
|
||||||
}
|
}
|
||||||
|
@ -289,19 +316,35 @@ public abstract class CryptoStreamsTestBase {
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int readAll(InputStream in, long pos, ByteBuffer buf)
|
||||||
|
throws IOException {
|
||||||
|
int n = 0;
|
||||||
|
int total = 0;
|
||||||
|
while (n != -1) {
|
||||||
|
total += n;
|
||||||
|
if (!buf.hasRemaining()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
n = ((ByteBufferPositionedReadable) in).read(pos + total, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
/** Test positioned read. */
|
/** Test positioned read. */
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testPositionedRead() throws Exception {
|
public void testPositionedRead() throws Exception {
|
||||||
OutputStream out = getOutputStream(defaultBufferSize);
|
try (OutputStream out = getOutputStream(defaultBufferSize)) {
|
||||||
writeData(out);
|
writeData(out);
|
||||||
|
}
|
||||||
|
|
||||||
InputStream in = getInputStream(defaultBufferSize);
|
try (InputStream in = getInputStream(defaultBufferSize)) {
|
||||||
// Pos: 1/3 dataLen
|
// Pos: 1/3 dataLen
|
||||||
positionedReadCheck(in , dataLen / 3);
|
positionedReadCheck(in, dataLen / 3);
|
||||||
|
|
||||||
// Pos: 1/2 dataLen
|
// Pos: 1/2 dataLen
|
||||||
positionedReadCheck(in, dataLen / 2);
|
positionedReadCheck(in, dataLen / 2);
|
||||||
in.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void positionedReadCheck(InputStream in, int pos) throws Exception {
|
private void positionedReadCheck(InputStream in, int pos) throws Exception {
|
||||||
|
@ -316,6 +359,35 @@ public abstract class CryptoStreamsTestBase {
|
||||||
Assert.assertArrayEquals(readData, expectedData);
|
Assert.assertArrayEquals(readData, expectedData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test positioned read with ByteBuffers. */
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testPositionedReadWithByteBuffer() throws Exception {
|
||||||
|
try (OutputStream out = getOutputStream(defaultBufferSize)) {
|
||||||
|
writeData(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (InputStream in = getInputStream(defaultBufferSize)) {
|
||||||
|
// Pos: 1/3 dataLen
|
||||||
|
positionedReadCheckWithByteBuffer(in, dataLen / 3);
|
||||||
|
|
||||||
|
// Pos: 1/2 dataLen
|
||||||
|
positionedReadCheckWithByteBuffer(in, dataLen / 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void positionedReadCheckWithByteBuffer(InputStream in, int pos)
|
||||||
|
throws Exception {
|
||||||
|
ByteBuffer result = ByteBuffer.allocate(dataLen);
|
||||||
|
int n = readAll(in, pos, result);
|
||||||
|
|
||||||
|
Assert.assertEquals(dataLen, n + pos);
|
||||||
|
byte[] readData = new byte[n];
|
||||||
|
System.arraycopy(result.array(), 0, readData, 0, n);
|
||||||
|
byte[] expectedData = new byte[n];
|
||||||
|
System.arraycopy(data, pos, expectedData, 0, n);
|
||||||
|
Assert.assertArrayEquals(readData, expectedData);
|
||||||
|
}
|
||||||
|
|
||||||
/** Test read fully */
|
/** Test read fully */
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testReadFully() throws Exception {
|
public void testReadFully() throws Exception {
|
||||||
|
@ -506,11 +578,39 @@ public abstract class CryptoStreamsTestBase {
|
||||||
Assert.assertArrayEquals(readData, expectedData);
|
Assert.assertArrayEquals(readData, expectedData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void byteBufferPreadCheck(InputStream in, ByteBuffer buf,
|
||||||
|
int bufPos) throws Exception {
|
||||||
|
// Test reading from position 0
|
||||||
|
buf.position(bufPos);
|
||||||
|
int n = ((ByteBufferPositionedReadable) in).read(0, buf);
|
||||||
|
Assert.assertEquals(bufPos + n, buf.position());
|
||||||
|
byte[] readData = new byte[n];
|
||||||
|
buf.rewind();
|
||||||
|
buf.position(bufPos);
|
||||||
|
buf.get(readData);
|
||||||
|
byte[] expectedData = new byte[n];
|
||||||
|
System.arraycopy(data, 0, expectedData, 0, n);
|
||||||
|
Assert.assertArrayEquals(readData, expectedData);
|
||||||
|
|
||||||
|
// Test reading from half way through the data
|
||||||
|
buf.position(bufPos);
|
||||||
|
n = ((ByteBufferPositionedReadable) in).read(dataLen / 2, buf);
|
||||||
|
Assert.assertEquals(bufPos + n, buf.position());
|
||||||
|
readData = new byte[n];
|
||||||
|
buf.rewind();
|
||||||
|
buf.position(bufPos);
|
||||||
|
buf.get(readData);
|
||||||
|
expectedData = new byte[n];
|
||||||
|
System.arraycopy(data, dataLen / 2, expectedData, 0, n);
|
||||||
|
Assert.assertArrayEquals(readData, expectedData);
|
||||||
|
}
|
||||||
|
|
||||||
/** Test byte buffer read with different buffer size. */
|
/** Test byte buffer read with different buffer size. */
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testByteBufferRead() throws Exception {
|
public void testByteBufferRead() throws Exception {
|
||||||
OutputStream out = getOutputStream(defaultBufferSize);
|
try (OutputStream out = getOutputStream(defaultBufferSize)) {
|
||||||
writeData(out);
|
writeData(out);
|
||||||
|
}
|
||||||
|
|
||||||
// Default buffer size, initial buffer position is 0
|
// Default buffer size, initial buffer position is 0
|
||||||
InputStream in = getInputStream(defaultBufferSize);
|
InputStream in = getInputStream(defaultBufferSize);
|
||||||
|
@ -561,6 +661,53 @@ public abstract class CryptoStreamsTestBase {
|
||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test byte buffer pread with different buffer size. */
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testByteBufferPread() throws Exception {
|
||||||
|
try (OutputStream out = getOutputStream(defaultBufferSize)) {
|
||||||
|
writeData(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (InputStream defaultBuf = getInputStream(defaultBufferSize);
|
||||||
|
InputStream smallBuf = getInputStream(smallBufferSize)) {
|
||||||
|
|
||||||
|
ByteBuffer buf = ByteBuffer.allocate(dataLen + 100);
|
||||||
|
|
||||||
|
// Default buffer size, initial buffer position is 0
|
||||||
|
byteBufferPreadCheck(defaultBuf, buf, 0);
|
||||||
|
|
||||||
|
// Default buffer size, initial buffer position is not 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(defaultBuf, buf, 11);
|
||||||
|
|
||||||
|
// Small buffer size, initial buffer position is 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(smallBuf, buf, 0);
|
||||||
|
|
||||||
|
// Small buffer size, initial buffer position is not 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(smallBuf, buf, 11);
|
||||||
|
|
||||||
|
// Test with direct ByteBuffer
|
||||||
|
buf = ByteBuffer.allocateDirect(dataLen + 100);
|
||||||
|
|
||||||
|
// Direct buffer, default buffer size, initial buffer position is 0
|
||||||
|
byteBufferPreadCheck(defaultBuf, buf, 0);
|
||||||
|
|
||||||
|
// Direct buffer, default buffer size, initial buffer position is not 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(defaultBuf, buf, 11);
|
||||||
|
|
||||||
|
// Direct buffer, small buffer size, initial buffer position is 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(smallBuf, buf, 0);
|
||||||
|
|
||||||
|
// Direct buffer, small buffer size, initial buffer position is not 0
|
||||||
|
buf.clear();
|
||||||
|
byteBufferPreadCheck(smallBuf, buf, 11);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=120000)
|
@Test(timeout=120000)
|
||||||
public void testCombinedOp() throws Exception {
|
public void testCombinedOp() throws Exception {
|
||||||
OutputStream out = getOutputStream(defaultBufferSize);
|
OutputStream out = getOutputStream(defaultBufferSize);
|
||||||
|
@ -782,15 +929,15 @@ public abstract class CryptoStreamsTestBase {
|
||||||
|
|
||||||
// Test pread
|
// Test pread
|
||||||
try (InputStream in = getInputStream(smallBufferSize)) {
|
try (InputStream in = getInputStream(smallBufferSize)) {
|
||||||
if (in instanceof PositionedReadable) {
|
if (in instanceof ByteBufferPositionedReadable) {
|
||||||
PositionedReadable pin = (PositionedReadable) in;
|
ByteBufferPositionedReadable bbpin = (ByteBufferPositionedReadable) in;
|
||||||
|
|
||||||
// Test unbuffer after pread
|
// Test unbuffer after pread
|
||||||
preadCheck(pin);
|
byteBufferPreadCheck(bbpin);
|
||||||
((CanUnbuffer) in).unbuffer();
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
|
||||||
// Test pread again after unbuffer
|
// Test pread again after unbuffer
|
||||||
preadCheck(pin);
|
byteBufferPreadCheck(bbpin);
|
||||||
|
|
||||||
// Test close after unbuffer
|
// Test close after unbuffer
|
||||||
((CanUnbuffer) in).unbuffer();
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
import org.apache.hadoop.fs.CanSetReadahead;
|
import org.apache.hadoop.fs.CanSetReadahead;
|
||||||
|
@ -184,9 +185,9 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
|
|
||||||
static class FakeInputStream extends InputStream
|
static class FakeInputStream extends InputStream
|
||||||
implements Seekable, PositionedReadable, ByteBufferReadable,
|
implements Seekable, PositionedReadable, ByteBufferReadable,
|
||||||
HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
|
ByteBufferPositionedReadable, HasFileDescriptor, CanSetDropBehind,
|
||||||
HasEnhancedByteBufferAccess, CanUnbuffer,
|
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer,
|
||||||
StreamCapabilities {
|
StreamCapabilities {
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private int pos = 0;
|
private int pos = 0;
|
||||||
private final byte[] data;
|
private final byte[] data;
|
||||||
|
@ -309,6 +310,31 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(long position, ByteBuffer buf) throws IOException {
|
||||||
|
if (buf == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
} else if (!buf.hasRemaining()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (position > length) {
|
||||||
|
throw new IOException("Cannot read after EOF.");
|
||||||
|
}
|
||||||
|
if (position < 0) {
|
||||||
|
throw new IOException("Cannot read to negative offset.");
|
||||||
|
}
|
||||||
|
|
||||||
|
checkStream();
|
||||||
|
|
||||||
|
if (position < length) {
|
||||||
|
int n = (int) Math.min(buf.remaining(), length - position);
|
||||||
|
buf.put(data, (int) position, n);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readFully(long position, byte[] b, int off, int len)
|
public void readFully(long position, byte[] b, int off, int len)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -384,6 +410,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
case StreamCapabilities.READAHEAD:
|
case StreamCapabilities.READAHEAD:
|
||||||
case StreamCapabilities.DROPBEHIND:
|
case StreamCapabilities.DROPBEHIND:
|
||||||
case StreamCapabilities.UNBUFFER:
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
case StreamCapabilities.READBYTEBUFFER:
|
||||||
|
case StreamCapabilities.PREADBYTEBUFFER:
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
@ -438,6 +466,8 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
|
assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
|
assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
|
assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
assertTrue(cis.hasCapability(StreamCapabilities.READBYTEBUFFER));
|
||||||
|
assertTrue(cis.hasCapability(StreamCapabilities.PREADBYTEBUFFER));
|
||||||
assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
|
assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
|
assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,11 +91,21 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testByteBufferRead() throws Exception {}
|
public void testByteBufferRead() throws Exception {}
|
||||||
|
|
||||||
|
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
|
||||||
|
@Override
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testPositionedReadWithByteBuffer() throws IOException {}
|
||||||
|
|
||||||
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
|
@Ignore("ChecksumFSOutputSummer doesn't support Syncable")
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testSyncable() throws IOException {}
|
public void testSyncable() throws IOException {}
|
||||||
|
|
||||||
|
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
|
||||||
|
@Override
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testByteBufferPread() throws IOException {}
|
||||||
|
|
||||||
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
|
@Ignore("ChecksumFSInputChecker doesn't support ByteBuffer read")
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
|
|
|
@ -91,6 +91,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testPositionedRead() throws IOException {}
|
public void testPositionedRead() throws IOException {}
|
||||||
|
|
||||||
|
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
|
||||||
|
@Override
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testPositionedReadWithByteBuffer() throws IOException {}
|
||||||
|
|
||||||
@Ignore("Wrapped stream doesn't support ReadFully")
|
@Ignore("Wrapped stream doesn't support ReadFully")
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
|
@ -106,6 +111,11 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testByteBufferRead() throws IOException {}
|
public void testByteBufferRead() throws IOException {}
|
||||||
|
|
||||||
|
@Ignore("Wrapped stream doesn't support ByteBufferPositionedReadable")
|
||||||
|
@Override
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testByteBufferPread() throws IOException {}
|
||||||
|
|
||||||
@Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
|
@Ignore("Wrapped stream doesn't support ByteBufferRead, Seek")
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
|
|
|
@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.ByteBufferPositionedReadable;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.ByteBufferUtil;
|
import org.apache.hadoop.fs.ByteBufferUtil;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
|
@ -100,7 +101,8 @@ import javax.annotation.Nonnull;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSInputStream extends FSInputStream
|
public class DFSInputStream extends FSInputStream
|
||||||
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
|
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
|
||||||
|
ByteBufferPositionedReadable {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static boolean tcpReadsDisabledForTesting = false;
|
public static boolean tcpReadsDisabledForTesting = false;
|
||||||
private long hedgedReadOpsLoopNumForTesting = 0;
|
private long hedgedReadOpsLoopNumForTesting = 0;
|
||||||
|
@ -1761,6 +1763,14 @@ public class DFSInputStream extends FSInputStream
|
||||||
throw new IOException("Mark/reset not supported");
|
throw new IOException("Mark/reset not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(long position, final ByteBuffer buf) throws IOException {
|
||||||
|
if (!buf.hasRemaining()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return pread(position, buf);
|
||||||
|
}
|
||||||
|
|
||||||
/** Utility class to encapsulate data node info and its address. */
|
/** Utility class to encapsulate data node info and its address. */
|
||||||
static final class DNAddrPair {
|
static final class DNAddrPair {
|
||||||
final DatanodeInfo info;
|
final DatanodeInfo info;
|
||||||
|
@ -1983,6 +1993,8 @@ public class DFSInputStream extends FSInputStream
|
||||||
case StreamCapabilities.READAHEAD:
|
case StreamCapabilities.READAHEAD:
|
||||||
case StreamCapabilities.DROPBEHIND:
|
case StreamCapabilities.DROPBEHIND:
|
||||||
case StreamCapabilities.UNBUFFER:
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
case StreamCapabilities.READBYTEBUFFER:
|
||||||
|
case StreamCapabilities.PREADBYTEBUFFER:
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -49,6 +49,24 @@ extern "C" {
|
||||||
*/
|
*/
|
||||||
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
|
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine if a file is using the "direct pread" optimization.
|
||||||
|
*
|
||||||
|
* @param file The HDFS file
|
||||||
|
* @return 1 if the file is using the direct pread optimization,
|
||||||
|
* 0 otherwise.
|
||||||
|
*/
|
||||||
|
int hdfsFileUsesDirectPread(struct hdfsFile_internal *file);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disable the direct pread optimization for a file.
|
||||||
|
*
|
||||||
|
* This is mainly provided for unit testing purposes.
|
||||||
|
*
|
||||||
|
* @param file The HDFS file
|
||||||
|
*/
|
||||||
|
void hdfsFileDisableDirectPread(struct hdfsFile_internal *file);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disable domain socket security checks.
|
* Disable domain socket security checks.
|
||||||
*
|
*
|
||||||
|
|
|
@ -75,9 +75,9 @@ int main(int argc, char **argv) {
|
||||||
const char *userPath = "/tmp/usertestfile.txt";
|
const char *userPath = "/tmp/usertestfile.txt";
|
||||||
|
|
||||||
char buffer[32], buffer2[256], rdbuffer[32];
|
char buffer[32], buffer2[256], rdbuffer[32];
|
||||||
tSize num_written_bytes, num_read_bytes;
|
tSize num_written_bytes, num_read_bytes, num_pread_bytes;
|
||||||
hdfsFS fs, lfs;
|
hdfsFS fs, lfs;
|
||||||
hdfsFile writeFile, readFile, localFile, appendFile, userFile;
|
hdfsFile writeFile, readFile, preadFile, localFile, appendFile, userFile;
|
||||||
tOffset currentPos, seekPos;
|
tOffset currentPos, seekPos;
|
||||||
int exists, totalResult, result, numEntries, i, j;
|
int exists, totalResult, result, numEntries, i, j;
|
||||||
const char *resp;
|
const char *resp;
|
||||||
|
@ -206,6 +206,7 @@ int main(int argc, char **argv) {
|
||||||
}
|
}
|
||||||
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
|
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
|
||||||
num_read_bytes, buffer);
|
num_read_bytes, buffer);
|
||||||
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
if (hdfsSeek(fs, readFile, 0L)) {
|
if (hdfsSeek(fs, readFile, 0L)) {
|
||||||
fprintf(stderr, "Failed to seek to file start!\n");
|
fprintf(stderr, "Failed to seek to file start!\n");
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
@ -217,16 +218,15 @@ int main(int argc, char **argv) {
|
||||||
|
|
||||||
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
|
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
|
||||||
sizeof(buffer));
|
sizeof(buffer));
|
||||||
|
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
|
||||||
|
fprintf(stderr, "Failed to read. Expected %s but got %s (%d bytes)\n",
|
||||||
|
fileContents, buffer, num_read_bytes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
||||||
num_read_bytes, buffer);
|
num_read_bytes, buffer);
|
||||||
|
|
||||||
memset(buffer, 0, strlen(fileContents + 1));
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
|
|
||||||
num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
|
|
||||||
sizeof(buffer));
|
|
||||||
fprintf(stderr, "Read following %d bytes:\n%s\n",
|
|
||||||
num_read_bytes, buffer);
|
|
||||||
|
|
||||||
hdfsCloseFile(fs, readFile);
|
hdfsCloseFile(fs, readFile);
|
||||||
|
|
||||||
// Test correct behaviour for unsupported filesystems
|
// Test correct behaviour for unsupported filesystems
|
||||||
|
@ -251,6 +251,104 @@ int main(int argc, char **argv) {
|
||||||
hdfsCloseFile(lfs, localFile);
|
hdfsCloseFile(lfs, localFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Pread tests
|
||||||
|
|
||||||
|
exists = hdfsExists(fs, readPath);
|
||||||
|
|
||||||
|
if (exists) {
|
||||||
|
fprintf(stderr, "Failed to validate existence of %s\n", readPath);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
preadFile = hdfsOpenFile(fs, readPath, O_RDONLY, 0, 0, 0);
|
||||||
|
if (!preadFile) {
|
||||||
|
fprintf(stderr, "Failed to open %s for reading!\n", readPath);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hdfsFileIsOpenForRead(preadFile)) {
|
||||||
|
fprintf(stderr, "hdfsFileIsOpenForRead: we just opened a file "
|
||||||
|
"with O_RDONLY, and it did not show up as 'open for "
|
||||||
|
"read'\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "hdfsAvailable: %d\n", hdfsAvailable(fs, preadFile));
|
||||||
|
|
||||||
|
num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
|
||||||
|
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
|
||||||
|
fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
|
||||||
|
fileContents, buffer, num_read_bytes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n",
|
||||||
|
num_pread_bytes, buffer);
|
||||||
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
|
if (hdfsTell(fs, preadFile) != 0) {
|
||||||
|
fprintf(stderr, "Pread changed position of file\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test pread midway through the file rather than at the beginning
|
||||||
|
const char *fileContentsChunk = "World!";
|
||||||
|
num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
|
||||||
|
if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
|
||||||
|
fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
|
||||||
|
fileContentsChunk, buffer, num_read_bytes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
|
||||||
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
|
if (hdfsTell(fs, preadFile) != 0) {
|
||||||
|
fprintf(stderr, "Pread changed position of file\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disable the direct pread path so that we really go through the slow
|
||||||
|
// read path
|
||||||
|
hdfsFileDisableDirectPread(preadFile);
|
||||||
|
|
||||||
|
num_pread_bytes = hdfsPread(fs, preadFile, 0, (void*)buffer, sizeof(buffer));
|
||||||
|
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
|
||||||
|
fprintf(stderr, "Failed to pread. Expected %s but got %s (%d bytes)\n",
|
||||||
|
fileContents, buffer, num_pread_bytes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "Pread following %d bytes:\n%s\n", num_pread_bytes, buffer);
|
||||||
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
|
if (hdfsTell(fs, preadFile) != 0) {
|
||||||
|
fprintf(stderr, "Pread changed position of file\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
num_pread_bytes = hdfsPread(fs, preadFile, 7, (void*)buffer, sizeof(buffer));
|
||||||
|
if (strncmp(fileContentsChunk, buffer, strlen(fileContentsChunk)) != 0) {
|
||||||
|
fprintf(stderr, "Failed to pread (direct). Expected %s but got %s (%d bytes)\n",
|
||||||
|
fileContentsChunk, buffer, num_read_bytes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "Pread (direct) following %d bytes:\n%s\n", num_pread_bytes, buffer);
|
||||||
|
memset(buffer, 0, strlen(fileContents + 1));
|
||||||
|
if (hdfsTell(fs, preadFile) != 0) {
|
||||||
|
fprintf(stderr, "Pread changed position of file\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
hdfsCloseFile(fs, preadFile);
|
||||||
|
|
||||||
|
// Test correct behaviour for unsupported filesystems
|
||||||
|
localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
|
||||||
|
|
||||||
|
if (hdfsFileUsesDirectPread(localFile)) {
|
||||||
|
fprintf(stderr, "Direct pread support incorrectly detected for local "
|
||||||
|
"filesystem\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
hdfsCloseFile(lfs, localFile);
|
||||||
|
}
|
||||||
|
|
||||||
totalResult = 0;
|
totalResult = 0;
|
||||||
result = 0;
|
result = 0;
|
||||||
{
|
{
|
||||||
|
|
|
@ -38,6 +38,7 @@
|
||||||
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
|
#define HADOOP_OSTRM "org/apache/hadoop/fs/FSDataOutputStream"
|
||||||
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
|
#define HADOOP_STAT "org/apache/hadoop/fs/FileStatus"
|
||||||
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
|
#define HADOOP_FSPERM "org/apache/hadoop/fs/permission/FsPermission"
|
||||||
|
#define HADOOP_FS_DATA_INPUT_STREAM "org/apache/hadoop/fs/FSDataInputStream"
|
||||||
#define JAVA_NET_ISA "java/net/InetSocketAddress"
|
#define JAVA_NET_ISA "java/net/InetSocketAddress"
|
||||||
#define JAVA_NET_URI "java/net/URI"
|
#define JAVA_NET_URI "java/net/URI"
|
||||||
#define JAVA_STRING "java/lang/String"
|
#define JAVA_STRING "java/lang/String"
|
||||||
|
@ -56,8 +57,23 @@
|
||||||
|
|
||||||
// Bit fields for hdfsFile_internal flags
|
// Bit fields for hdfsFile_internal flags
|
||||||
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
|
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
|
||||||
|
#define HDFS_FILE_SUPPORTS_DIRECT_PREAD (1<<0)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads bytes using the read(ByteBuffer) API. By using Java
|
||||||
|
* DirectByteBuffers we can avoid copying the bytes from kernel space into
|
||||||
|
* user space.
|
||||||
|
*/
|
||||||
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
|
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads bytes using the read(long, ByteBuffer) API. By using Java
|
||||||
|
* DirectByteBuffers we can avoid copying the bytes from kernel space into
|
||||||
|
* user space.
|
||||||
|
*/
|
||||||
|
tSize preadDirect(hdfsFS fs, hdfsFile file, tOffset position, void* buffer,
|
||||||
|
tSize length);
|
||||||
|
|
||||||
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
|
static void hdfsFreeFileInfoEntry(hdfsFileInfo *hdfsFileInfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -235,6 +251,16 @@ void hdfsFileDisableDirectRead(hdfsFile file)
|
||||||
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
|
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int hdfsFileUsesDirectPread(hdfsFile file)
|
||||||
|
{
|
||||||
|
return !!(file->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD);
|
||||||
|
}
|
||||||
|
|
||||||
|
void hdfsFileDisableDirectPread(hdfsFile file)
|
||||||
|
{
|
||||||
|
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_PREAD;
|
||||||
|
}
|
||||||
|
|
||||||
int hdfsDisableDomainSocketSecurity(void)
|
int hdfsDisableDomainSocketSecurity(void)
|
||||||
{
|
{
|
||||||
jthrowable jthr;
|
jthrowable jthr;
|
||||||
|
@ -922,6 +948,62 @@ int hdfsStreamBuilderSetDefaultBlockSize(struct hdfsStreamBuilder *bld,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delegates to FsDataInputStream#hasCapability(String). Used to check if a
|
||||||
|
* given input stream supports certain methods, such as
|
||||||
|
* ByteBufferReadable#read(ByteBuffer).
|
||||||
|
*
|
||||||
|
* @param jFile the FsDataInputStream to call hasCapability on
|
||||||
|
* @param capability the name of the capability to query; for a full list of
|
||||||
|
* possible values see StreamCapabilities
|
||||||
|
*
|
||||||
|
* @return true if the given jFile has the given capability, false otherwise
|
||||||
|
*
|
||||||
|
* @see org.apache.hadoop.fs.StreamCapabilities
|
||||||
|
*/
|
||||||
|
static int hdfsHasStreamCapability(jobject jFile,
|
||||||
|
const char *capability) {
|
||||||
|
int ret = 0;
|
||||||
|
jthrowable jthr = NULL;
|
||||||
|
jvalue jVal;
|
||||||
|
jstring jCapabilityString = NULL;
|
||||||
|
|
||||||
|
/* Get the JNIEnv* corresponding to current thread */
|
||||||
|
JNIEnv* env = getJNIEnv();
|
||||||
|
if (env == NULL) {
|
||||||
|
errno = EINTERNAL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
jthr = newJavaStr(env, capability, &jCapabilityString);
|
||||||
|
if (jthr) {
|
||||||
|
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"hdfsHasStreamCapability(%s): newJavaStr", capability);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
jthr = invokeMethod(env, &jVal, INSTANCE, jFile,
|
||||||
|
HADOOP_FS_DATA_INPUT_STREAM, "hasCapability", "(Ljava/lang/String;)Z",
|
||||||
|
jCapabilityString);
|
||||||
|
if (jthr) {
|
||||||
|
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"hdfsHasStreamCapability(%s): FSDataInputStream#hasCapability",
|
||||||
|
capability);
|
||||||
|
goto done;
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
destroyLocalReference(env, jthr);
|
||||||
|
destroyLocalReference(env, jCapabilityString);
|
||||||
|
if (ret) {
|
||||||
|
errno = ret;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (jVal.z == JNI_TRUE) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
||||||
int32_t bufferSize, int16_t replication, int64_t blockSize)
|
int32_t bufferSize, int16_t replication, int64_t blockSize)
|
||||||
{
|
{
|
||||||
|
@ -932,7 +1014,7 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
||||||
return f{is|os};
|
return f{is|os};
|
||||||
*/
|
*/
|
||||||
int accmode = flags & O_ACCMODE;
|
int accmode = flags & O_ACCMODE;
|
||||||
jstring jStrBufferSize = NULL, jStrReplication = NULL;
|
jstring jStrBufferSize = NULL, jStrReplication = NULL, jCapabilityString = NULL;
|
||||||
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
|
jobject jConfiguration = NULL, jPath = NULL, jFile = NULL;
|
||||||
jobject jFS = (jobject)fs;
|
jobject jFS = (jobject)fs;
|
||||||
jthrowable jthr;
|
jthrowable jthr;
|
||||||
|
@ -1090,16 +1172,16 @@ static hdfsFile hdfsOpenFileImpl(hdfsFS fs, const char *path, int flags,
|
||||||
file->flags = 0;
|
file->flags = 0;
|
||||||
|
|
||||||
if ((flags & O_WRONLY) == 0) {
|
if ((flags & O_WRONLY) == 0) {
|
||||||
// Try a test read to see if we can do direct reads
|
// Check the StreamCapabilities of jFile to see if we can do direct
|
||||||
char buf;
|
// reads
|
||||||
if (readDirect(fs, file, &buf, 0) == 0) {
|
if (hdfsHasStreamCapability(jFile, "in:readbytebuffer")) {
|
||||||
// Success - 0-byte read should return 0
|
|
||||||
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
|
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
|
||||||
} else if (errno != ENOTSUP) {
|
}
|
||||||
// Unexpected error. Clear it, don't set the direct flag.
|
|
||||||
fprintf(stderr,
|
// Check the StreamCapabilities of jFile to see if we can do direct
|
||||||
"hdfsOpenFile(%s): WARN: Unexpected error %d when testing "
|
// preads
|
||||||
"for direct read compatibility\n", path, errno);
|
if (hdfsHasStreamCapability(jFile, "in:preadbytebuffer")) {
|
||||||
|
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_PREAD;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
@ -1110,6 +1192,7 @@ done:
|
||||||
destroyLocalReference(env, jConfiguration);
|
destroyLocalReference(env, jConfiguration);
|
||||||
destroyLocalReference(env, jPath);
|
destroyLocalReference(env, jPath);
|
||||||
destroyLocalReference(env, jFile);
|
destroyLocalReference(env, jFile);
|
||||||
|
destroyLocalReference(env, jCapabilityString);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
if (file) {
|
if (file) {
|
||||||
if (file->file) {
|
if (file->file) {
|
||||||
|
@ -1311,6 +1394,12 @@ static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the underlying stream supports the ByteBufferReadable interface then
|
||||||
|
* this method will transparently use read(ByteBuffer). This can help
|
||||||
|
* improve performance as it avoids unnecessary copies between the kernel
|
||||||
|
* space, the Java process space, and the C process space.
|
||||||
|
*/
|
||||||
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
|
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
|
||||||
{
|
{
|
||||||
jobject jInputStream;
|
jobject jInputStream;
|
||||||
|
@ -1423,6 +1512,12 @@ tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
|
||||||
return (jVal.i < 0) ? 0 : jVal.i;
|
return (jVal.i < 0) ? 0 : jVal.i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the underlying stream supports the ByteBufferPositionedReadable
|
||||||
|
* interface then this method will transparently use read(long, ByteBuffer).
|
||||||
|
* This can help improve performance as it avoids unnecessary copies between
|
||||||
|
* the kernel space, the Java process space, and the C process space.
|
||||||
|
*/
|
||||||
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
|
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
|
||||||
void* buffer, tSize length)
|
void* buffer, tSize length)
|
||||||
{
|
{
|
||||||
|
@ -1442,6 +1537,10 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_PREAD) {
|
||||||
|
return preadDirect(fs, f, position, buffer, length);
|
||||||
|
}
|
||||||
|
|
||||||
env = getJNIEnv();
|
env = getJNIEnv();
|
||||||
if (env == NULL) {
|
if (env == NULL) {
|
||||||
errno = EINTERNAL;
|
errno = EINTERNAL;
|
||||||
|
@ -1491,6 +1590,60 @@ tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,
|
||||||
return jVal.i;
|
return jVal.i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tSize preadDirect(hdfsFS fs, hdfsFile f, tOffset position, void* buffer,
|
||||||
|
tSize length)
|
||||||
|
{
|
||||||
|
// JAVA EQUIVALENT:
|
||||||
|
// ByteBuffer buf = ByteBuffer.allocateDirect(length) // wraps C buffer
|
||||||
|
// fis.read(position, buf);
|
||||||
|
|
||||||
|
jvalue jVal;
|
||||||
|
jthrowable jthr;
|
||||||
|
jobject bb;
|
||||||
|
|
||||||
|
//Get the JNIEnv* corresponding to current thread
|
||||||
|
JNIEnv* env = getJNIEnv();
|
||||||
|
if (env == NULL) {
|
||||||
|
errno = EINTERNAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Error checking... make sure that this file is 'readable'
|
||||||
|
if (f->type != HDFS_STREAM_INPUT) {
|
||||||
|
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Read the requisite bytes
|
||||||
|
bb = (*env)->NewDirectByteBuffer(env, buffer, length);
|
||||||
|
if (bb == NULL) {
|
||||||
|
errno = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
|
||||||
|
"readDirect: NewDirectByteBuffer");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
jthr = invokeMethod(env, &jVal, INSTANCE, f->file,
|
||||||
|
HADOOP_FS_DATA_INPUT_STREAM, "read", "(JLjava/nio/ByteBuffer;)I",
|
||||||
|
position, bb);
|
||||||
|
destroyLocalReference(env, bb);
|
||||||
|
if (jthr) {
|
||||||
|
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
|
||||||
|
"preadDirect: FSDataInputStream#read");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
// Reached EOF, return 0
|
||||||
|
if (jVal.i < 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// 0 bytes read, return error
|
||||||
|
if (jVal.i == 0) {
|
||||||
|
errno = EINTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return jVal.i;
|
||||||
|
}
|
||||||
|
|
||||||
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
|
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
|
||||||
{
|
{
|
||||||
// JAVA EQUIVALENT
|
// JAVA EQUIVALENT
|
||||||
|
|
|
@ -0,0 +1,269 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests the DFS positional read functionality on a single node
|
||||||
|
* mini-cluster. These tests are inspired from {@link TestPread}. The tests
|
||||||
|
* are much less comprehensive than other pread tests because pread already
|
||||||
|
* internally uses {@link ByteBuffer}s.
|
||||||
|
*/
|
||||||
|
public class TestByteBufferPread {
|
||||||
|
|
||||||
|
private static MiniDFSCluster cluster;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static byte[] fileContents;
|
||||||
|
private static Path testFile;
|
||||||
|
private static Random rand;
|
||||||
|
|
||||||
|
private static final long SEED = 0xDEADBEEFL;
|
||||||
|
private static final int BLOCK_SIZE = 4096;
|
||||||
|
private static final int FILE_SIZE = 12 * BLOCK_SIZE;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() throws IOException {
|
||||||
|
// Setup the cluster with a small block size so we can create small files
|
||||||
|
// that span multiple blocks
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
// Create a test file that spans 12 blocks, and contains a bunch of random
|
||||||
|
// bytes
|
||||||
|
fileContents = new byte[FILE_SIZE];
|
||||||
|
rand = new Random(SEED);
|
||||||
|
rand.nextBytes(fileContents);
|
||||||
|
testFile = new Path("/byte-buffer-pread-test.dat");
|
||||||
|
try (FSDataOutputStream out = fs.create(testFile, (short) 3)) {
|
||||||
|
out.write(fileContents);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test preads with {@link java.nio.HeapByteBuffer}s.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPreadWithHeapByteBuffer() throws IOException {
|
||||||
|
testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test preads with {@link java.nio.DirectByteBuffer}s.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testPreadWithDirectByteBuffer() throws IOException {
|
||||||
|
testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the entire testFile using the pread API and validates that its
|
||||||
|
* contents are properly loaded into the supplied {@link ByteBuffer}.
|
||||||
|
*/
|
||||||
|
private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the buffer is full
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
// Make sure the contents of the read buffer equal the contents of the
|
||||||
|
// file
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents, fileContents);
|
||||||
|
buffer.position(buffer.limit());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempts to read the testFile into a {@link ByteBuffer} that is already
|
||||||
|
* full, and validates that doing so does not change the contents of the
|
||||||
|
* supplied {@link ByteBuffer}.
|
||||||
|
*/
|
||||||
|
private void testPreadWithFullByteBuffer(ByteBuffer buffer)
|
||||||
|
throws IOException {
|
||||||
|
// Load some dummy data into the buffer
|
||||||
|
byte[] existingBufferBytes = new byte[FILE_SIZE];
|
||||||
|
rand.nextBytes(existingBufferBytes);
|
||||||
|
buffer.put(existingBufferBytes);
|
||||||
|
// Make sure the buffer is full
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
// Attempt to read into the buffer, 0 bytes should be read since the
|
||||||
|
// buffer is full
|
||||||
|
assertEquals(0, in.read(buffer));
|
||||||
|
|
||||||
|
// Double check the buffer is still full and its contents have not
|
||||||
|
// changed
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents, existingBufferBytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by setting a
|
||||||
|
* {@link ByteBuffer#limit} on the buffer. Validates that only half of the
|
||||||
|
* testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPreadWithLimitedByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
// Set the buffer limit to half the size of the file
|
||||||
|
buffer.limit(FILE_SIZE / 2);
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we set the buffer limit to half the size of the file, we should
|
||||||
|
// have only read half of the file into the buffer
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
// Check that the buffer is full and the contents equal the first half of
|
||||||
|
// the file
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by setting the
|
||||||
|
* {@link ByteBuffer#position} the half the size of the file. Validates that
|
||||||
|
* only half of the testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPreadWithPositionedByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
// Set the buffer position to half the size of the file
|
||||||
|
buffer.position(FILE_SIZE / 2);
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we set the buffer position to half the size of the file, we
|
||||||
|
// should have only read half of the file into the buffer
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
// Check that the buffer is full and the contents equal the first half of
|
||||||
|
// the file
|
||||||
|
assertFalse(buffer.hasRemaining());
|
||||||
|
buffer.position(FILE_SIZE / 2);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads half of the testFile into the {@link ByteBuffer} by specifying a
|
||||||
|
* position for the pread API that is half of the file size. Validates that
|
||||||
|
* only half of the testFile is loaded into the buffer.
|
||||||
|
*/
|
||||||
|
private void testPositionedPreadWithByteBuffer(
|
||||||
|
ByteBuffer buffer) throws IOException {
|
||||||
|
int bytesRead;
|
||||||
|
int totalBytesRead = 0;
|
||||||
|
|
||||||
|
try (FSDataInputStream in = fs.open(testFile)) {
|
||||||
|
// Start reading from halfway through the file
|
||||||
|
while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2,
|
||||||
|
buffer)) > 0) {
|
||||||
|
totalBytesRead += bytesRead;
|
||||||
|
// Check that each call to read changes the position of the ByteBuffer
|
||||||
|
// correctly
|
||||||
|
assertEquals(totalBytesRead, buffer.position());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we starting reading halfway through the file, the buffer should
|
||||||
|
// only be half full
|
||||||
|
assertEquals(totalBytesRead, FILE_SIZE / 2);
|
||||||
|
assertEquals(buffer.position(), FILE_SIZE / 2);
|
||||||
|
assertTrue(buffer.hasRemaining());
|
||||||
|
// Check that the buffer contents equal the second half of the file
|
||||||
|
buffer.position(0);
|
||||||
|
byte[] bufferContents = new byte[FILE_SIZE / 2];
|
||||||
|
buffer.get(bufferContents);
|
||||||
|
assertArrayEquals(bufferContents,
|
||||||
|
Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdown() throws IOException {
|
||||||
|
try {
|
||||||
|
fs.delete(testFile, false);
|
||||||
|
fs.close();
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -622,12 +622,11 @@ public class TestShortCircuitLocalRead {
|
||||||
stm.write(fileData);
|
stm.write(fileData);
|
||||||
stm.close();
|
stm.close();
|
||||||
try {
|
try {
|
||||||
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, conf,
|
||||||
conf, shortCircuitFails);
|
shortCircuitFails);
|
||||||
//RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
assertFalse(true);
|
||||||
assertTrue(
|
} catch (UnsupportedOperationException unex) {
|
||||||
"RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
// RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
||||||
checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
"doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
||||||
|
@ -639,18 +638,4 @@ public class TestShortCircuitLocalRead {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
|
||||||
byte[] expected, int readOffset) throws IOException {
|
|
||||||
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
|
||||||
ByteBuffer actual =
|
|
||||||
ByteBuffer.allocateDirect(expected.length - readOffset);
|
|
||||||
IOUtils.skipFully(stm, readOffset);
|
|
||||||
try {
|
|
||||||
stm.read(actual);
|
|
||||||
} catch(UnsupportedOperationException unex) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue