diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 5c032c3a6a5..25a971447f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -28,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
- implements Seekable, PositionedReadable, Closeable,
- ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
-
+ implements Seekable, PositionedReadable, Closeable,
+ ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
+ SupportsZeroCopy {
public FSDataInputStream(InputStream in)
throws IOException {
super(in);
@@ -167,4 +167,15 @@ public class FSDataInputStream extends DataInputStream
"support setting the drop-behind caching setting.");
}
}
+
+ @Override
+ public ZeroCopyCursor createZeroCopyCursor()
+ throws IOException, ZeroCopyUnavailableException {
+ try {
+ return ((SupportsZeroCopy)in).createZeroCopyCursor();
+ }
+ catch (ClassCastException e) {
+ throw new ZeroCopyUnavailableException(e);
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
index 8d668feeaba..e3308814ce2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.fs;
import java.io.*;
+import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ZeroCopyUnavailableException;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit
@@ -30,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
public abstract class FSInputStream extends InputStream
- implements Seekable, PositionedReadable {
+ implements Seekable, PositionedReadable, SupportsZeroCopy {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
@@ -86,4 +88,11 @@ public abstract class FSInputStream extends InputStream
throws IOException {
readFully(position, buffer, 0, buffer.length);
}
+
+ @Override
+ public ZeroCopyCursor createZeroCopyCursor()
+ throws IOException, ZeroCopyUnavailableException {
+ throw new ZeroCopyUnavailableException("zero copy is not implemented " +
+ "for this filesystem type.");
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/SupportsZeroCopy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/SupportsZeroCopy.java
new file mode 100644
index 00000000000..2a4d51da07a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/SupportsZeroCopy.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Supports zero-copy reads.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface SupportsZeroCopy {
+ /**
+ * Get a zero-copy cursor to use for zero-copy reads.
+ *
+ * @throws IOException
+ * If there was an error creating the ZeroCopyCursor
+ * @throws UnsupportedOperationException
+ * If this stream does not support zero-copy reads.
+ * This is used, for example, when one stream wraps another
+ * which may or may not support ZCR.
+ */
+ public ZeroCopyCursor createZeroCopyCursor()
+ throws IOException, ZeroCopyUnavailableException;
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyCursor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyCursor.java
new file mode 100644
index 00000000000..5181b49da2d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyCursor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A ZeroCopyCursor allows you to make zero-copy reads.
+ *
+ * Cursors should be closed when they are no longer needed.
+ *
+ * Example:
+ * FSDataInputStream fis = fs.open("/file");
+ * ZeroCopyCursor cursor = fis.createZeroCopyCursor();
+ * try {
+ * cursor.read(128);
+ * ByteBuffer data = cursor.getData();
+ * processData(data);
+ * } finally {
+ * cursor.close();
+ * }
+ */
+public interface ZeroCopyCursor extends Closeable {
+ /**
+ * Set the fallback buffer used for this zero copy cursor.
+ * The fallback buffer is used when a true zero-copy read is impossible.
+ * If there is no fallback buffer, UnsupportedOperationException is thrown
+ * when a true zero-copy read cannot be done.
+ *
+ * @param fallbackBuffer The fallback buffer to set, or null for none.
+ */
+ public void setFallbackBuffer(ByteBuffer fallbackBuffer);
+
+ /**
+ * @return the fallback buffer in use, or null if there is none.
+ */
+ public ByteBuffer getFallbackBuffer();
+
+ /**
+ * @param skipChecksums Whether we should skip checksumming with this
+ * zero copy cursor.
+ */
+ public void setSkipChecksums(boolean skipChecksums);
+
+ /**
+ * @return Whether we should skip checksumming with this
+ * zero copy cursor.
+ */
+ public boolean getSkipChecksums();
+
+ /**
+ * @param allowShortReads Whether we should allow short reads.
+ */
+ public void setAllowShortReads(boolean allowShortReads);
+
+ /**
+ * @return Whether we should allow short reads.
+ */
+ public boolean getAllowShortReads();
+
+ /**
+ * Perform a zero-copy read.
+ *
+ * @param toRead The minimum number of bytes to read.
+ * Must not be negative. If we hit EOF before
+ * reading this many bytes, we will either throw
+ * EOFException (if allowShortReads = false), or
+ * return a short read (if allowShortReads = true).
+ * A short read could be as short as 0 bytes.
+ * @throws UnsupportedOperationException
+ * If a true zero-copy read cannot be done, and no fallback
+ * buffer was set.
+ * @throws EOFException
+ * If allowShortReads = false, and we can't read all the bytes
+ * that were requested. This will never be thrown if
+ * allowShortReads = true.
+ * @throws IOException
+ * If there was an error while reading the data.
+ */
+ public void read(int toRead)
+ throws UnsupportedOperationException, EOFException, IOException;
+
+ /**
+ * Get the current data buffer.
+ *
+ * This buffer will remain valid until either this cursor is closed, or we
+ * call read() again on this same cursor. You can find the amount of data
+ * that was read previously by calling ByteBuffer#remaining.
+ *
+ * @return The current data buffer.
+ */
+ public ByteBuffer getData();
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
new file mode 100644
index 00000000000..9cb68277e56
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+public class ZeroCopyUnavailableException extends IOException {
+ private static final long serialVersionUID = 0L;
+
+ public ZeroCopyUnavailableException(String message) {
+ super(message);
+ }
+
+ public ZeroCopyUnavailableException(String message, Exception e) {
+ super(message, e);
+ }
+
+ public ZeroCopyUnavailableException(Exception e) {
+ super(e);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index acfbea0c8a5..fb05e3ab1aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -330,4 +330,14 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
index 56528927987..be9c53edf77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
@@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_dfs
)
add_executable(test_libhdfs_threaded
+ main/native/libhdfs/expect.c
main/native/libhdfs/test_libhdfs_threaded.c
)
target_link_libraries(test_libhdfs_threaded
@@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threaded
pthread
)
+add_executable(test_libhdfs_zerocopy
+ main/native/libhdfs/expect.c
+ main/native/libhdfs/test/test_libhdfs_zerocopy.c
+)
+target_link_libraries(test_libhdfs_zerocopy
+ hdfs
+ native_mini_dfs
+ pthread
+)
+
IF(REQUIRE_LIBWEBHDFS)
add_subdirectory(contrib/libwebhdfs)
ENDIF(REQUIRE_LIBWEBHDFS)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index e1e40c0191f..456a79f7d89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
/**
* A BlockReader is responsible for reading a single block
@@ -81,4 +83,21 @@ public interface BlockReader extends ByteBufferReadable {
* All short-circuit reads are also local.
*/
boolean isShortCircuit();
+
+ /**
+ * Do a zero-copy read with the current block reader.
+ *
+ * We assume that the calling code has done bounds checking, and won't ask
+ * us for more bytes than are supposed to be visible (or are in the file).
+ *
+ * @param buffers The zero-copy buffers object.
+ * @param curBlock The current block.
+ * @param blockPos Position in the current block to start reading at.
+ * @param toRead The number of bytes to read from the block.
+ *
+ * @return true if the read was done, false otherwise.
+ */
+ boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+ LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmapManager mmapManager);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index c1cb0b3db3f..3e430a150c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -22,11 +22,15 @@ import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
@@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockReader {
private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
+ private ClientMmap clientMmap;
+ private boolean mmapDisabled;
private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) {
@@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockReader {
this.datanodeID = datanodeID;
this.block = block;
this.fisCache = fisCache;
+ this.clientMmap = null;
+ this.mmapDisabled = false;
// read and handle the common header here. For now just a version
checksumIn.getChannel().position(0);
@@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockReader {
@Override
public synchronized void close() throws IOException {
+ if (clientMmap != null) {
+ clientMmap.unref();
+ clientMmap = null;
+ }
if (fisCache != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename +
@@ -534,4 +546,48 @@ class BlockReaderLocal implements BlockReader {
public boolean isShortCircuit() {
return true;
}
+
+ @Override
+ public boolean readZeroCopy(HdfsZeroCopyCursor cursor,
+ LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmapManager mmapManager) {
+ if (clientMmap == null) {
+ if (mmapDisabled) {
+ return false;
+ }
+ try {
+ clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+ if (clientMmap == null) {
+ mmapDisabled = true;
+ return false;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while setting up mmap for " + filename, e);
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (IOException e) {
+ LOG.error("unable to set up mmap for " + filename, e);
+ mmapDisabled = true;
+ return false;
+ }
+ }
+ long limit = blockPos + toRead;
+ if (limit > Integer.MAX_VALUE) {
+ /*
+ * In Java, ByteBuffers use a 32-bit length, capacity, offset, etc.
+ * This limits our mmap'ed regions to 2 GB in length.
+ * TODO: we can implement zero-copy for larger blocks by doing multiple
+ * mmaps.
+ */
+ mmapDisabled = true;
+ clientMmap.unref();
+ clientMmap = null;
+ return false;
+ }
+ ByteBuffer mmapBuffer = clientMmap.getMappedByteBuffer().duplicate();
+ mmapBuffer.position((int)blockPos);
+ mmapBuffer.limit((int)limit);
+ cursor.setMmap(clientMmap, mmapBuffer);
+ return true;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index aeb6279bead..a4a4f680bfa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -701,4 +703,11 @@ class BlockReaderLocalLegacy implements BlockReader {
public boolean isShortCircuit() {
return true;
}
+
+ @Override
+ public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+ LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmapManager mmapManager) {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0aded40b073..102386931d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -103,6 +103,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -204,7 +205,43 @@ public class DFSClient implements java.io.Closeable {
private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
+ private ClientMmapManager mmapManager;
+ private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
+ new ClientMmapManagerFactory();
+
+ private static final class ClientMmapManagerFactory {
+ private ClientMmapManager mmapManager = null;
+ /**
+ * Tracks the number of users of mmapManager.
+ */
+ private int refcnt = 0;
+
+ synchronized ClientMmapManager get(Configuration conf) {
+ if (refcnt++ == 0) {
+ mmapManager = ClientMmapManager.fromConf(conf);
+ } else {
+ String mismatches = mmapManager.verifyConfigurationMatches(conf);
+ if (!mismatches.isEmpty()) {
+ LOG.warn("The ClientMmapManager settings you specified " +
+ "have been ignored because another thread created the " +
+ "ClientMmapManager first. " + mismatches);
+ }
+ }
+ return mmapManager;
+ }
+
+ synchronized void unref(ClientMmapManager mmapManager) {
+ if (this.mmapManager != mmapManager) {
+ throw new IllegalArgumentException();
+ }
+ if (--refcnt == 0) {
+ IOUtils.cleanup(LOG, mmapManager);
+ mmapManager = null;
+ }
+ }
+ }
+
/**
* DFSClient configuration
*/
@@ -513,6 +550,7 @@ public class DFSClient implements java.io.Closeable {
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
+ this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
}
/**
@@ -716,9 +754,12 @@ public class DFSClient implements java.io.Closeable {
/** Abort and release resources held. Ignore all errors. */
void abort() {
+ if (mmapManager != null) {
+ MMAP_MANAGER_FACTORY.unref(mmapManager);
+ mmapManager = null;
+ }
clientRunning = false;
closeAllFilesBeingWritten(true);
-
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
@@ -762,6 +803,10 @@ public class DFSClient implements java.io.Closeable {
*/
@Override
public synchronized void close() throws IOException {
+ if (mmapManager != null) {
+ MMAP_MANAGER_FACTORY.unref(mmapManager);
+ mmapManager = null;
+ }
if(clientRunning) {
closeAllFilesBeingWritten(false);
clientRunning = false;
@@ -2474,4 +2519,8 @@ public class DFSClient implements java.io.Closeable {
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
+
+ ClientMmapManager getMmapManager() {
+ return mmapManager;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f392df8d9ed..7d0fee4d11a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -373,6 +373,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
+ public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
+ public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+ public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
+ public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 15 * 60 * 1000;
+ public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms";
+ public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT = 4;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 4131ffa4426..06b3b68b2d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.ZeroCopyCursor;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -92,12 +93,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.totalBytesRead = 0;
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
+ this.totalZeroCopyBytesRead = 0;
}
public ReadStatistics(ReadStatistics rhs) {
this.totalBytesRead = rhs.getTotalBytesRead();
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+ this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
}
/**
@@ -123,6 +126,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
public long getTotalShortCircuitBytesRead() {
return totalShortCircuitBytesRead;
}
+
+ /**
+ * @return The total number of zero-copy bytes read.
+ */
+ public long getTotalZeroCopyBytesRead() {
+ return totalZeroCopyBytesRead;
+ }
/**
* @return The total number of bytes read which were not local.
@@ -145,12 +155,21 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
}
+
+ void addZeroCopyBytes(long amt) {
+ this.totalBytesRead += amt;
+ this.totalLocalBytesRead += amt;
+ this.totalShortCircuitBytesRead += amt;
+ this.totalZeroCopyBytesRead += amt;
+ }
private long totalBytesRead;
private long totalLocalBytesRead;
private long totalShortCircuitBytesRead;
+
+ private long totalZeroCopyBytesRead;
}
private final FileInputStreamCache fileInputStreamCache;
@@ -1393,4 +1412,67 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
+
+ synchronized void readZeroCopy(HdfsZeroCopyCursor zcursor, int toRead)
+ throws IOException {
+ assert(toRead > 0);
+ if (((blockReader == null) || (blockEnd == -1)) &&
+ (pos < getFileLength())) {
+ /*
+ * If we don't have a blockReader, or the one we have has no more bytes
+ * left to read, we call seekToBlockSource to get a new blockReader and
+ * recalculate blockEnd. Note that we assume we're not at EOF here
+ * (we check this above).
+ */
+ if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+ throw new IOException("failed to allocate new BlockReader " +
+ "at position " + pos);
+ }
+ }
+ long curPos = pos;
+ boolean canSkipChecksums = zcursor.getSkipChecksums();
+ long blockLeft = blockEnd - curPos + 1;
+ if (zcursor.getAllowShortReads()) {
+ if (blockLeft < toRead) {
+ toRead = (int)blockLeft;
+ }
+ }
+ if (canSkipChecksums && (toRead <= blockLeft)) {
+ long blockStartInFile = currentLocatedBlock.getStartOffset();
+ long blockPos = curPos - blockStartInFile;
+ if (blockReader.readZeroCopy(zcursor,
+ currentLocatedBlock, blockPos, toRead,
+ dfsClient.getMmapManager())) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("readZeroCopy read " + toRead + " bytes from " +
+ "offset " + curPos + " via the zero-copy read path. " +
+ "blockEnd = " + blockEnd);
+ }
+ readStatistics.addZeroCopyBytes(toRead);
+ seek(pos + toRead);
+ return;
+ }
+ }
+ /*
+ * Slow path reads.
+ *
+ * readStatistics will be updated when we call back into this
+ * stream's read methods.
+ */
+ long prevBlockEnd = blockEnd;
+ int slowReadAmount = zcursor.readViaSlowPath(toRead);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("readZeroCopy read " + slowReadAmount + " bytes " +
+ "from offset " + curPos + " via the fallback read path. " +
+ "prevBlockEnd = " + prevBlockEnd + ", blockEnd = " + blockEnd +
+ ", canSkipChecksums = " + canSkipChecksums);
+ }
+ }
+
+ @Override
+ public ZeroCopyCursor createZeroCopyCursor()
+ throws IOException, UnsupportedOperationException {
+ return new HdfsZeroCopyCursor(this,
+ dfsClient.getConf().skipShortCircuitChecksums);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
new file mode 100644
index 00000000000..42b3eb7bcf1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsZeroCopyCursor.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+
+public class HdfsZeroCopyCursor implements ZeroCopyCursor {
+ public static final Log LOG = LogFactory.getLog(HdfsZeroCopyCursor.class);
+ private DFSInputStream stream;
+ private boolean skipChecksums;
+ private boolean allowShortReads;
+ private ClientMmap mmap;
+ private ByteBuffer fallbackBuffer;
+ private ByteBuffer readBuffer;
+
+ HdfsZeroCopyCursor(DFSInputStream stream, boolean skipChecksums) {
+ this.stream = stream;
+ this.skipChecksums = skipChecksums;
+ this.allowShortReads = false;
+ this.mmap = null;
+ this.fallbackBuffer = null;
+ this.readBuffer = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream = null;
+ if (mmap != null) {
+ mmap.unref();
+ mmap = null;
+ }
+ fallbackBuffer = null;
+ readBuffer = null;
+ }
+
+ @Override
+ public void setFallbackBuffer(ByteBuffer fallbackBuffer) {
+ this.fallbackBuffer = fallbackBuffer;
+ }
+
+ @Override
+ public ByteBuffer getFallbackBuffer() {
+ return this.fallbackBuffer;
+ }
+
+ @Override
+ public void setSkipChecksums(boolean skipChecksums) {
+ this.skipChecksums = skipChecksums;
+ }
+
+ @Override
+ public boolean getSkipChecksums() {
+ return this.skipChecksums;
+ }
+
+ @Override
+ public void setAllowShortReads(boolean allowShortReads) {
+ this.allowShortReads = allowShortReads;
+ }
+
+ @Override
+ public boolean getAllowShortReads() {
+ return this.allowShortReads;
+ }
+
+ @Override
+ public void read(int toRead) throws UnsupportedOperationException,
+ EOFException, IOException {
+ if (toRead < 0) {
+ throw new IllegalArgumentException("can't read " + toRead + " bytes.");
+ }
+ stream.readZeroCopy(this, toRead);
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ return readBuffer;
+ }
+
+ int readViaSlowPath(int toRead) throws EOFException, IOException {
+ if (fallbackBuffer == null) {
+ throw new UnsupportedOperationException("unable to read via " +
+ "the fastpath, and there was no fallback buffer provided.");
+ }
+ fallbackBuffer.clear();
+ fallbackBuffer.limit(toRead); // will throw if toRead is too large
+
+ int totalRead = 0;
+ readBuffer = fallbackBuffer;
+ try {
+ while (toRead > 0) {
+ int nread = stream.read(fallbackBuffer);
+ if (nread < 0) {
+ break;
+ }
+ toRead -= nread;
+ totalRead += nread;
+ if (allowShortReads) {
+ break;
+ }
+ }
+ } finally {
+ fallbackBuffer.flip();
+ }
+ if ((toRead > 0) && (!allowShortReads)) {
+ throw new EOFException("only read " + totalRead + " bytes out of " +
+ "a requested " + toRead + " before hitting EOF");
+ }
+ return totalRead;
+ }
+
+ void setMmap(ClientMmap mmap, ByteBuffer readBuffer) {
+ if (this.mmap != mmap) {
+ if (this.mmap != null) {
+ this.mmap.unref();
+ }
+ }
+ this.mmap = mmap;
+ mmap.ref();
+ this.readBuffer = readBuffer;
+ }
+
+ ClientMmap getMmap() {
+ return mmap;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 9d69dc18182..eab35c97821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -27,9 +27,11 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -485,4 +487,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public boolean isShortCircuit() {
return false;
}
+
+ @Override
+ public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+ LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmapManager mmapManager) {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 27726ff9fd1..8c2bdf3c844 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -29,9 +29,11 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -40,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -451,4 +452,11 @@ public class RemoteBlockReader2 implements BlockReader {
public boolean isShortCircuit() {
return false;
}
+
+ @Override
+ public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
+ LocatedBlock curBlock, long blockPos, int toRead,
+ ClientMmapManager manager) {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
new file mode 100644
index 00000000000..566c2b5457c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.FileInputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A memory-mapped region used by an HDFS client.
+ *
+ * This class includes a reference count and some other information used by
+ * ClientMmapManager to track and cache mmaps.
+ */
+@InterfaceAudience.Private
+public class ClientMmap {
+ static final Log LOG = LogFactory.getLog(ClientMmap.class);
+
+ /**
+ * A reference to the manager of this mmap.
+ *
+ * This is only a weak reference to help minimize the damange done by
+ * code which leaks references accidentally.
+ */
+ private final WeakReference manager;
+
+ /**
+ * The actual mapped memory region.
+ */
+ private final MappedByteBuffer map;
+
+ /**
+ * A reference count tracking how many threads are using this object.
+ */
+ private final AtomicInteger refCount = new AtomicInteger(1);
+
+ /**
+ * Block pertaining to this mmap
+ */
+ private final ExtendedBlock block;
+
+ /**
+ * The DataNode where this mmap came from.
+ */
+ private final DatanodeID datanodeID;
+
+ /**
+ * The monotonic time when this mmap was last evictable.
+ */
+ private long lastEvictableTimeNs;
+
+ public static ClientMmap load(ClientMmapManager manager, FileInputStream in,
+ ExtendedBlock block, DatanodeID datanodeID)
+ throws IOException {
+ MappedByteBuffer map =
+ in.getChannel().map(MapMode.READ_ONLY, 0,
+ in.getChannel().size());
+ return new ClientMmap(manager, map, block, datanodeID);
+ }
+
+ private ClientMmap(ClientMmapManager manager, MappedByteBuffer map,
+ ExtendedBlock block, DatanodeID datanodeID)
+ throws IOException {
+ this.manager = new WeakReference(manager);
+ this.map = map;
+ this.block = block;
+ this.datanodeID = datanodeID;
+ this.lastEvictableTimeNs = 0;
+ }
+
+ /**
+ * Decrement the reference count on this object.
+ * Should be called with the ClientMmapManager lock held.
+ */
+ public void unref() {
+ int count = refCount.decrementAndGet();
+ if (count < 0) {
+ throw new IllegalArgumentException("can't decrement the " +
+ "reference count on this ClientMmap lower than 0.");
+ } else if (count == 0) {
+ ClientMmapManager man = manager.get();
+ if (man == null) {
+ unmap();
+ } else {
+ man.makeEvictable(this);
+ }
+ }
+ }
+
+ /**
+ * Increment the reference count on this object.
+ *
+ * @return The new reference count.
+ */
+ public int ref() {
+ return refCount.getAndIncrement();
+ }
+
+ @VisibleForTesting
+ public ExtendedBlock getBlock() {
+ return block;
+ }
+
+ DatanodeID getDatanodeID() {
+ return datanodeID;
+ }
+
+ public MappedByteBuffer getMappedByteBuffer() {
+ return map;
+ }
+
+ public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
+ this.lastEvictableTimeNs = lastEvictableTimeNs;
+ }
+
+ public long getLastEvictableTimeNs() {
+ return this.lastEvictableTimeNs;
+ }
+
+ /**
+ * Unmap the memory region.
+ *
+ * There isn't any portable way to unmap a memory region in Java.
+ * So we use the sun.nio method here.
+ * Note that unmapping a memory region could cause crashes if code
+ * continues to reference the unmapped code. However, if we don't
+ * manually unmap the memory, we are dependent on the finalizer to
+ * do it, and we have no idea when the finalizer will run.
+ */
+ void unmap() {
+ assert(refCount.get() == 0);
+ if (map instanceof sun.nio.ch.DirectBuffer) {
+ final sun.misc.Cleaner cleaner =
+ ((sun.nio.ch.DirectBuffer) map).cleaner();
+ cleaner.clean();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
new file mode 100644
index 00000000000..7be519439e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java
@@ -0,0 +1,476 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client;
+
+import java.io.Closeable;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Tracks mmap instances used on an HDFS client.
+ *
+ * mmaps can be used concurrently by multiple threads at once.
+ * mmaps cannot be closed while they are in use.
+ *
+ * The cache is important for performance, because the first time an mmap is
+ * created, the page table entries (PTEs) are not yet set up.
+ * Even when reading data that is entirely resident in memory, reading an
+ * mmap the second time is faster.
+ */
+@InterfaceAudience.Private
+public class ClientMmapManager implements Closeable {
+ public static final Log LOG = LogFactory.getLog(ClientMmapManager.class);
+
+ private boolean closed = false;
+
+ private final int cacheSize;
+
+ private final long timeoutNs;
+
+ private final int runsPerTimeout;
+
+ private final Lock lock = new ReentrantLock();
+
+ /**
+ * Maps block, datanode_id to the client mmap object.
+ * If the ClientMmap is in the process of being loaded,
+ * {@link Waitable#await()} will block.
+ *
+ * Protected by the ClientMmapManager lock.
+ */
+ private final TreeMap> mmaps =
+ new TreeMap>();
+
+ /**
+ * Maps the last use time to the client mmap object.
+ * We ensure that each last use time is unique by inserting a jitter of a
+ * nanosecond or two if necessary.
+ *
+ * Protected by the ClientMmapManager lock.
+ * ClientMmap objects that are in use are never evictable.
+ */
+ private final TreeMap evictable =
+ new TreeMap();
+
+ private final ScheduledThreadPoolExecutor executor =
+ new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("ClientMmapManager").
+ build());
+
+ /**
+ * The CacheCleaner for this ClientMmapManager. We don't create this
+ * and schedule it until it becomes necessary.
+ */
+ private CacheCleaner cacheCleaner;
+
+ /**
+ * Factory method to create a ClientMmapManager from a Hadoop
+ * configuration.
+ */
+ public static ClientMmapManager fromConf(Configuration conf) {
+ return new ClientMmapManager(conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
+ DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
+ conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+ DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
+ conf.getInt(DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
+ DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT));
+ }
+
+ public ClientMmapManager(int cacheSize, long timeoutMs, int runsPerTimeout) {
+ this.cacheSize = cacheSize;
+ this.timeoutNs = timeoutMs * 1000000;
+ this.runsPerTimeout = runsPerTimeout;
+ }
+
+ long getTimeoutMs() {
+ return this.timeoutNs / 1000000;
+ }
+
+ int getRunsPerTimeout() {
+ return this.runsPerTimeout;
+ }
+
+ public String verifyConfigurationMatches(Configuration conf) {
+ StringBuilder bld = new StringBuilder();
+ int cacheSize = conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
+ DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
+ if (this.cacheSize != cacheSize) {
+ bld.append("You specified a cache size of ").append(cacheSize).
+ append(", but the existing cache size is ").append(this.cacheSize).
+ append(". ");
+ }
+ long timeoutMs = conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
+ DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
+ if (getTimeoutMs() != timeoutMs) {
+ bld.append("You specified a cache timeout of ").append(timeoutMs).
+ append(" ms, but the existing cache timeout is ").
+ append(getTimeoutMs()).append("ms").append(". ");
+ }
+ int runsPerTimeout = conf.getInt(
+ DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
+ DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT);
+ if (getRunsPerTimeout() != runsPerTimeout) {
+ bld.append("You specified ").append(runsPerTimeout).
+ append(" runs per timeout, but the existing runs per timeout is ").
+ append(getTimeoutMs()).append(". ");
+ }
+ return bld.toString();
+ }
+
+ private static class Waitable {
+ private T val;
+ private final Condition cond;
+
+ public Waitable(Condition cond) {
+ this.val = null;
+ this.cond = cond;
+ }
+
+ public T await() throws InterruptedException {
+ while (this.val == null) {
+ this.cond.await();
+ }
+ return this.val;
+ }
+
+ public void provide(T val) {
+ this.val = val;
+ this.cond.signalAll();
+ }
+ }
+
+ private static class Key implements Comparable {
+ private final ExtendedBlock block;
+ private final DatanodeID datanode;
+
+ Key(ExtendedBlock block, DatanodeID datanode) {
+ this.block = block;
+ this.datanode = datanode;
+ }
+
+ /**
+ * Compare two ClientMmap regions that we're storing.
+ *
+ * When we append to a block, we bump the genstamp. It is important to
+ * compare the genStamp here. That way, we will not return a shorter
+ * mmap than required.
+ */
+ @Override
+ public int compareTo(Key o) {
+ return ComparisonChain.start().
+ compare(block.getBlockId(), o.block.getBlockId()).
+ compare(block.getGenerationStamp(), o.block.getGenerationStamp()).
+ compare(block.getBlockPoolId(), o.block.getBlockPoolId()).
+ compare(datanode, o.datanode).
+ result();
+ }
+
+ @Override
+ public boolean equals(Object rhs) {
+ if (rhs == null) {
+ return false;
+ }
+ try {
+ Key o = (Key)rhs;
+ return (compareTo(o) == 0);
+ } catch (ClassCastException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return block.hashCode() ^ datanode.hashCode();
+ }
+ }
+
+ /**
+ * Thread which handles expiring mmaps from the cache.
+ */
+ private static class CacheCleaner implements Runnable, Closeable {
+ private WeakReference managerRef;
+ private ScheduledFuture> future;
+
+ CacheCleaner(ClientMmapManager manager) {
+ this.managerRef= new WeakReference(manager);
+ }
+
+ @Override
+ public void run() {
+ ClientMmapManager manager = managerRef.get();
+ if (manager == null) return;
+ long curTime = System.nanoTime();
+ try {
+ manager.lock.lock();
+ manager.evictStaleEntries(curTime);
+ } finally {
+ manager.lock.unlock();
+ }
+ }
+
+ void setFuture(ScheduledFuture> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void close() throws IOException {
+ future.cancel(false);
+ }
+ }
+
+ /**
+ * Evict entries which are older than curTime + timeoutNs from the cache.
+ *
+ * NOTE: you must call this function with the lock held.
+ */
+ private void evictStaleEntries(long curTime) {
+ if (closed) {
+ return;
+ }
+ Iterator> iter =
+ evictable.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry entry = iter.next();
+ if (entry.getKey() + timeoutNs >= curTime) {
+ return;
+ }
+ ClientMmap mmap = entry.getValue();
+ Key key = new Key(mmap.getBlock(), mmap.getDatanodeID());
+ mmaps.remove(key);
+ iter.remove();
+ mmap.unmap();
+ }
+ }
+
+ /**
+ * Evict one mmap object from the cache.
+ *
+ * NOTE: you must call this function with the lock held.
+ *
+ * @return True if an object was evicted; false if none
+ * could be evicted.
+ */
+ private boolean evictOne() {
+ Entry entry = evictable.pollFirstEntry();
+ if (entry == null) {
+ // We don't want to try creating another mmap region, because the
+ // cache is full.
+ return false;
+ }
+ ClientMmap evictedMmap = entry.getValue();
+ Key evictedKey = new Key(evictedMmap.getBlock(),
+ evictedMmap.getDatanodeID());
+ mmaps.remove(evictedKey);
+ evictedMmap.unmap();
+ return true;
+ }
+
+ /**
+ * Create a new mmap object.
+ *
+ * NOTE: you must call this function with the lock held.
+ *
+ * @param key The key which describes this mmap.
+ * @param in The input stream to use to create the mmap.
+ * @return The new mmap object, or null if there were
+ * insufficient resources.
+ * @throws IOException If there was an I/O error creating the mmap.
+ */
+ private ClientMmap create(Key key, FileInputStream in) throws IOException {
+ if (mmaps.size() + 1 > cacheSize) {
+ if (!evictOne()) {
+ LOG.warn("mmap cache is full (with " + cacheSize + " elements) and " +
+ "nothing is evictable. Ignoring request for mmap with " +
+ "datanodeID=" + key.datanode + ", " + "block=" + key.block);
+ return null;
+ }
+ }
+ // Create the condition variable that other threads may wait on.
+ Waitable waitable =
+ new Waitable(lock.newCondition());
+ mmaps.put(key, waitable);
+ // Load the entry
+ boolean success = false;
+ ClientMmap mmap = null;
+ try {
+ try {
+ lock.unlock();
+ mmap = ClientMmap.load(this, in, key.block, key.datanode);
+ } finally {
+ lock.lock();
+ }
+ if (cacheCleaner == null) {
+ cacheCleaner = new CacheCleaner(this);
+ ScheduledFuture> future =
+ executor.scheduleAtFixedRate(cacheCleaner,
+ timeoutNs, timeoutNs / runsPerTimeout, TimeUnit.NANOSECONDS);
+ cacheCleaner.setFuture(future);
+ }
+ success = true;
+ } finally {
+ if (!success) {
+ LOG.warn("failed to create mmap for datanodeID=" + key.datanode +
+ ", " + "block=" + key.block);
+ mmaps.remove(key);
+ }
+ waitable.provide(mmap);
+ }
+ return mmap;
+ }
+
+ /**
+ * Get or create an mmap region.
+ *
+ * @param node The DataNode that owns the block for this mmap region.
+ * @param block The block ID, block pool ID, and generation stamp of
+ * the block we want to read.
+ * @param in An open file for this block. This stream is only used
+ * if we have to create a new mmap; if we use an
+ * existing one, it is ignored.
+ *
+ * @return The client mmap region.
+ */
+ public ClientMmap fetch(DatanodeID datanodeID, ExtendedBlock block,
+ FileInputStream in) throws IOException, InterruptedException {
+ LOG.debug("fetching mmap with datanodeID=" + datanodeID + ", " +
+ "block=" + block);
+ Key key = new Key(block, datanodeID);
+ ClientMmap mmap = null;
+ try {
+ lock.lock();
+ if (closed) {
+ throw new IOException("ClientMmapManager is closed.");
+ }
+ while (mmap == null) {
+ Waitable entry = mmaps.get(key);
+ if (entry == null) {
+ return create(key, in);
+ }
+ mmap = entry.await();
+ }
+ if (mmap.ref() == 1) {
+ // When going from nobody using the mmap (ref = 0) to somebody
+ // using the mmap (ref = 1), we must make the mmap un-evictable.
+ evictable.remove(mmap.getLastEvictableTimeNs());
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
+ ", " + "block=" + block);
+ return mmap;
+ }
+
+ /**
+ * Make an mmap evictable.
+ *
+ * When an mmap is evictable, it may be removed from the cache if necessary.
+ * mmaps can only be evictable if nobody is using them.
+ *
+ * @param mmap The mmap to make evictable.
+ */
+ void makeEvictable(ClientMmap mmap) {
+ try {
+ lock.lock();
+ if (closed) {
+ // If this ClientMmapManager is closed, then don't bother with the
+ // cache; just close the mmap.
+ mmap.unmap();
+ return;
+ }
+ long now = System.nanoTime();
+ while (evictable.containsKey(now)) {
+ now++;
+ }
+ mmap.setLastEvictableTimeNs(now);
+ evictable.put(now, mmap);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ lock.lock();
+ closed = true;
+ IOUtils.cleanup(LOG, cacheCleaner);
+
+ // Unmap all the mmaps that nobody is using.
+ // The ones which are in use will be unmapped just as soon as people stop
+ // using them.
+ evictStaleEntries(Long.MAX_VALUE);
+
+ executor.shutdown();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ public interface ClientMmapVisitor {
+ void accept(ClientMmap mmap);
+ }
+
+ @VisibleForTesting
+ public synchronized void visitMmaps(ClientMmapVisitor visitor)
+ throws InterruptedException {
+ for (Waitable entry : mmaps.values()) {
+ visitor.accept(entry.await());
+ }
+ }
+
+ public void visitEvictable(ClientMmapVisitor visitor)
+ throws InterruptedException {
+ for (ClientMmap mmap : evictable.values()) {
+ visitor.accept(mmap);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
new file mode 100644
index 00000000000..39761b5a03d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+
+#include
+#include
+#include
+#include
+
+int expectFileStats(hdfsFile file,
+ uint64_t expectedTotalBytesRead,
+ uint64_t expectedTotalLocalBytesRead,
+ uint64_t expectedTotalShortCircuitBytesRead,
+ uint64_t expectedTotalZeroCopyBytesRead)
+{
+ struct hdfsReadStatistics *stats = NULL;
+ EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
+ if (expectedTotalBytesRead != UINT64_MAX) {
+ EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
+ }
+ if (expectedTotalLocalBytesRead != UINT64_MAX) {
+ EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
+ stats->totalLocalBytesRead);
+ }
+ if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
+ EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
+ stats->totalShortCircuitBytesRead);
+ }
+ if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
+ EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
+ stats->totalZeroCopyBytesRead);
+ }
+ hdfsFileFreeReadStatistics(stats);
+ return 0;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
index 9d5d863881b..3dc777127dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
@@ -19,16 +19,19 @@
#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
#define LIBHDFS_NATIVE_TESTS_EXPECT_H
+#include
#include
+struct hdfsFile_internal;
+
#define EXPECT_ZERO(x) \
do { \
int __my_ret__ = x; \
if (__my_ret__) { \
int __my_errno__ = errno; \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got nonzero from %s\n", \
- __LINE__, __my_ret__, __my_errno__, #x); \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@@ -38,9 +41,9 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != NULL) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
"got non-NULL value %p from %s\n", \
- __LINE__, __my_errno__, __my_ret__, #x); \
+ __FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
return -1; \
} \
} while (0);
@@ -50,8 +53,8 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ == NULL) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
- "got NULL from %s\n", __LINE__, __my_errno__, #x); \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
+ "got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@@ -61,15 +64,16 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != -1) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
- "code %d (errno: %d): expected -1 from %s\n", __LINE__, \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+ "code %d (errno: %d): expected -1 from %s\n", \
+ __FILE__, __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
if (__my_errno__ != e) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected errno = %d from %s\n", \
- __LINE__, __my_ret__, __my_errno__, e, #x); \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
return -1; \
} \
} while (0);
@@ -79,9 +83,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (!__my_ret__) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
- "code %d (errno: %d): got zero from %s\n", __LINE__, \
- __my_ret__, __my_errno__, #x); \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+ "code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
+ __my_ret__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@@ -91,9 +95,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ < 0) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got negative return from %s\n", \
- __LINE__, __my_ret__, __my_errno__, #x); \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@@ -103,9 +107,21 @@
int __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
- fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected %d\n", \
- __LINE__, __my_ret__, __my_errno__, (x)); \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+ return -1; \
+ } \
+ } while (0);
+
+#define EXPECT_INT64_EQ(x, y) \
+ do { \
+ int64_t __my_ret__ = y; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ != (x)) { \
+ fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+ "value %"PRId64" (errno: %d): expected %"PRId64"\n", \
+ __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
@@ -117,4 +133,17 @@
ret = -errno; \
} while (ret == -EINTR);
+/**
+ * Test that an HDFS file has the given statistics.
+ *
+ * Any parameter can be set to UINT64_MAX to avoid checking it.
+ *
+ * @return 0 on success; error code otherwise
+ */
+int expectFileStats(struct hdfsFile_internal *file,
+ uint64_t expectedTotalBytesRead,
+ uint64_t expectedTotalLocalBytesRead,
+ uint64_t expectedTotalShortCircuitBytesRead,
+ uint64_t expectedTotalZeroCopyBytesRead);
+
#endif
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
index 27824347692..cfffe385a8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
@@ -39,6 +39,7 @@
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
+#define HADOOP_ZERO_COPY_CURSOR "org/apache/hadoop/fs/ZeroCopyCursor"
#define JAVA_VOID "V"
@@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile file,
goto done;
}
s->totalShortCircuitBytesRead = jVal.j;
+ jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+ "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+ "getTotalZeroCopyBytesRead", "()J");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
+ goto done;
+ }
+ s->totalZeroCopyBytesRead = jVal.j;
*stats = s;
s = NULL;
ret = 0;
@@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile file)
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
+int hdfsDisableDomainSocketSecurity(void)
+{
+ jthrowable jthr;
+ JNIEnv* env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ jthr = invokeMethod(env, NULL, STATIC, NULL,
+ "org/apache/hadoop/net/unix/DomainSocket",
+ "disableBindPathValidation", "()V");
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "DomainSocket#disableBindPathValidation");
+ return -1;
+ }
+ return 0;
+}
+
/**
* hdfsJniEnv: A wrapper struct to be used as 'value'
* while saving thread -> JNIEnv* mappings
@@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
return NULL;
}
-/**
- * Set a configuration value.
- *
- * @param env The JNI environment
- * @param jConfiguration The configuration object to modify
- * @param key The key to modify
- * @param value The value to set the key to
- *
- * @return NULL on success; exception otherwise
- */
-static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
- const char *key, const char *value)
-{
- jthrowable jthr;
- jstring jkey = NULL, jvalue = NULL;
-
- jthr = newJavaStr(env, key, &jkey);
- if (jthr)
- goto done;
- jthr = newJavaStr(env, value, &jvalue);
- if (jthr)
- goto done;
- jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
- HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
- JPARAM(JAVA_STRING), JAVA_VOID),
- jkey, jvalue);
- if (jthr)
- goto done;
-done:
- destroyLocalReference(env, jkey);
- destroyLocalReference(env, jvalue);
- return jthr;
-}
-
static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
@@ -2108,6 +2103,248 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
return 0;
}
+struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file)
+{
+ int ret;
+ jobject zcursor = NULL;
+ jvalue jVal;
+ jthrowable jthr;
+ JNIEnv* env;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return NULL;
+ }
+ if (file->type != INPUT) {
+ ret = EINVAL;
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)file->file, HADOOP_ISTRM,
+ "createZeroCopyCursor", "()L"HADOOP_ZERO_COPY_CURSOR";");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorAlloc: createZeroCopyCursor");
+ goto done;
+ }
+ zcursor = (*env)->NewGlobalRef(env, jVal.l);
+ if (!zcursor) {
+ ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorAlloc: NewGlobalRef");
+ }
+ ret = 0;
+done:
+ if (ret) {
+ errno = ret;
+ }
+ return (struct hadoopZeroCopyCursor*)zcursor;
+}
+
+int hadoopZeroCopyCursorSetFallbackBuffer(struct hadoopZeroCopyCursor* zcursor,
+ void *cbuf, uint32_t size)
+{
+ int ret;
+ jobject buffer = NULL;
+ jthrowable jthr;
+ JNIEnv* env;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ buffer = (*env)->NewDirectByteBuffer(env, cbuf, size);
+ if (!buffer) {
+ ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorSetFallbackBuffer: NewDirectByteBuffer("
+ "size=%"PRId32"):", size);
+ goto done;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "setFallbackBuffer",
+ "(Ljava/nio/ByteBuffer;)V", buffer);
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorSetFallbackBuffer: "
+ "FileSystem#setFallbackBuffer");
+ goto done;
+ }
+ ret = 0;
+done:
+ if (ret) {
+ (*env)->DeleteLocalRef(env, buffer);
+ errno = ret;
+ return -1;
+ }
+ return 0;
+}
+
+int hadoopZeroCopyCursorSetSkipChecksums(struct hadoopZeroCopyCursor* zcursor,
+ int skipChecksums)
+{
+ JNIEnv* env;
+ jthrowable jthr;
+ jboolean shouldSkipChecksums = skipChecksums ? JNI_TRUE : JNI_FALSE;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "setSkipChecksums", "(Z)V",
+ shouldSkipChecksums);
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorSetSkipChecksums(): setSkipChecksums failed");
+ return -1;
+ }
+ return 0;
+}
+
+int hadoopZeroCopyCursorSetAllowShortReads(
+ struct hadoopZeroCopyCursor* zcursor, int allowShort)
+{
+ JNIEnv* env;
+ jthrowable jthr;
+ jboolean shouldAllowShort = allowShort ? JNI_TRUE : JNI_FALSE;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "setAllowShortReads", "(Z)V",
+ shouldAllowShort);
+ if (jthr) {
+ errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorSetAllowShortReads(): setAllowShortReads "
+ "failed");
+ return -1;
+ }
+ return 0;
+}
+
+void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor)
+{
+ JNIEnv* env;
+ jthrowable jthr;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ return;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "close", "()V");
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyCursorFree(): close failed");
+ }
+ (*env)->DeleteGlobalRef(env, (jobject)zcursor);
+}
+
+/**
+ * Translate an exception from ZeroCopyCursor#read, translate it into a return
+ * code.
+ */
+static int translateZCRException(JNIEnv *env, jthrowable exc)
+{
+ int ret;
+ char *className = NULL;
+ jthrowable jthr = classNameOfObject(exc, env, &className);
+
+ if (jthr) {
+ fprintf(stderr, "hadoopZeroCopyRead: unknown "
+ "exception from read().\n");
+ destroyLocalReference(env, jthr);
+ destroyLocalReference(env, jthr);
+ ret = EIO;
+ goto done;
+ }
+ if (!strcmp(className, "java.io.EOFException")) {
+ ret = 0; // EOF
+ goto done;
+ }
+ if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
+ ret = EPROTONOSUPPORT;
+ goto done;
+ }
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
+done:
+ free(className);
+ return ret;
+}
+
+int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
+ int32_t toRead, const void **data)
+{
+ int32_t ret, nRead = -1;
+ JNIEnv* env;
+ jthrowable jthr;
+ jobject byteBuffer = NULL;
+ uint8_t *addr;
+ jint position;
+ jvalue jVal;
+
+ env = getJNIEnv();
+ if (env == NULL) {
+ errno = EINTERNAL;
+ return -1;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "read", "(I)V", toRead);
+ if (jthr) {
+ ret = translateZCRException(env, jthr);
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)zcursor,
+ HADOOP_ZERO_COPY_CURSOR, "getData",
+ "()Ljava/nio/ByteBuffer;");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyRead(toRead=%"PRId32"): getData failed",
+ toRead);
+ goto done;
+ }
+ byteBuffer = jVal.l;
+ addr = (*env)->GetDirectBufferAddress(env, byteBuffer);
+ if (!addr) {
+ fprintf(stderr, "hadoopZeroCopyRead(toRead=%"PRId32"): "
+ "failed to get direct buffer address.\n", toRead);
+ ret = EIO;
+ goto done;
+ }
+ jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
+ "java/nio/ByteBuffer", "position", "()I");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#position "
+ "failed", toRead);
+ goto done;
+ }
+ position = jVal.i;
+ jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
+ "java/nio/ByteBuffer", "remaining", "()I");
+ if (jthr) {
+ ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#remaining "
+ "failed", toRead);
+ goto done;
+ }
+ ret = 0;
+ nRead = jVal.i;
+ *data = addr + position;
+done:
+ (*env)->DeleteLocalRef(env, byteBuffer);
+ if (nRead == -1) {
+ errno = ret;
+ return -1;
+ }
+ return nRead;
+}
+
char***
hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
{
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
index 1871665955c..69fad082b69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
@@ -85,6 +85,7 @@ extern "C" {
uint64_t totalBytesRead;
uint64_t totalLocalBytesRead;
uint64_t totalShortCircuitBytesRead;
+ uint64_t totalZeroCopyBytesRead;
};
/**
@@ -680,7 +681,89 @@ extern "C" {
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
-
+
+ /**
+ * Create a zero-copy cursor object.
+ *
+ * @param file The file to use for zero-copy reads.
+ *
+ * @return The zero-copy cursor, or NULL + errno on failure.
+ */
+ struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file);
+
+ /**
+ * Set the fallback buffer which will be used by the zero copy object.
+ *
+ * You are responsible for ensuring that this buffer stays valid until you
+ * either set a different buffer by calling this function again, or free the
+ * zero-copy cursor.
+ *
+ * @param zcursor The zero-copy cursor.
+ * @param cbuf The buffer to use.
+ * @param size Size of the buffer.
+ *
+ * @return 0 on success. -1 on error. Errno will be set on
+ * error.
+ */
+ int hadoopZeroCopyCursorSetFallbackBuffer(
+ struct hadoopZeroCopyCursor* zcursor, void *cbuf, uint32_t size);
+
+ /**
+ * Set whether our cursor should skip checksums or not.
+ *
+ * @param zcursor The cursor
+ * @param skipChecksums Nonzero to skip checksums.
+ *
+ * @return -1 on error, 0 otherwise.
+ */
+ int hadoopZeroCopyCursorSetSkipChecksums(
+ struct hadoopZeroCopyCursor* zcursor, int skipChecksums);
+
+ /**
+ * Set whether our cursor should allow short reads to occur.
+ * Short reads will always occur if there is not enough data to read
+ * (i.e., at EOF), but normally we don't return them when reading other
+ * parts of the file.
+ *
+ * @param zcursor The cursor
+ * @param skipChecksums Nonzero to skip checksums.
+ *
+ * @return -1 on error, 0 otherwise.
+ */
+ int hadoopZeroCopyCursorSetAllowShortReads(
+ struct hadoopZeroCopyCursor* zcursor, int allowShort);
+
+ /**
+ * Free zero-copy cursor.
+ *
+ * This will dispose of the cursor allocated by hadoopZeroCopyCursorAlloc, as
+ * well as any memory map that we have created. You must be done with the
+ * data returned from hadoopZeroCopyRead before calling this.
+ *
+ * @param zcursor The zero-copy cursor.
+ */
+ void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor);
+
+ /*
+ * Perform a zero-copy read.
+ *
+ * @param zcursor The zero-copy cursor object.
+ * @param toRead The maximum amount to read.
+ * @param data (out param) on succesful return, a pointer to the
+ * data. This pointer will remain valid until the next
+ * call to hadoopZeroCopyRead, or until
+ * hadoopZeroCopyCursorFree is called on zcursor.
+ *
+ * @return -2 if zero-copy is unavailable, and
+ * -1 if there was an error. errno will be the error.
+ * 0 if we hit end-of-file without reading anything.
+ * The positive number of bytes read otherwise. Short
+ * reads will happen only if EOF is reached.
+ * The amount read otherwise.
+ */
+ int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
+ int32_t toRead, const void **data);
+
#ifdef __cplusplus
}
#endif
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
index b3ff4f2a637..0eab9a68aea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
@@ -48,6 +48,15 @@ extern "C" {
* @param file The HDFS file
*/
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
+
+ /**
+ * Disable domain socket security checks.
+ *
+ * @param 0 if domain socket security was disabled;
+ * -1 if not.
+ */
+ int hdfsDisableDomainSocketSecurity(void);
+
#ifdef __cplusplus
}
#endif
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
index c768c9c1d04..21ff9d9e0da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
@@ -608,3 +608,42 @@ JNIEnv* getJNIEnv(void)
return env;
}
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
+{
+ jclass clazz;
+ int ret;
+
+ clazz = (*env)->FindClass(env, name);
+ if (!clazz) {
+ printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+ "javaObjectIsOfClass(%s)", name);
+ return -1;
+ }
+ ret = (*env)->IsInstanceOf(env, obj, clazz);
+ (*env)->DeleteLocalRef(env, clazz);
+ return ret == JNI_TRUE ? 1 : 0;
+}
+
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+ const char *key, const char *value)
+{
+ jthrowable jthr;
+ jstring jkey = NULL, jvalue = NULL;
+
+ jthr = newJavaStr(env, key, &jkey);
+ if (jthr)
+ goto done;
+ jthr = newJavaStr(env, value, &jvalue);
+ if (jthr)
+ goto done;
+ jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
+ "org/apache/hadoop/conf/Configuration", "set",
+ "(Ljava/lang/String;Ljava/lang/String;)V",
+ jkey, jvalue);
+ if (jthr)
+ goto done;
+done:
+ (*env)->DeleteLocalRef(env, jkey);
+ (*env)->DeleteLocalRef(env, jvalue);
+ return jthr;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
index f37dea739fc..c2a7409a9c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h
@@ -114,6 +114,32 @@ jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
* */
JNIEnv* getJNIEnv(void);
+/**
+ * Figure out if a Java object is an instance of a particular class.
+ *
+ * @param env The Java environment.
+ * @param obj The object to check.
+ * @param name The class name to check.
+ *
+ * @return -1 if we failed to find the referenced class name.
+ * 0 if the object is not of the given class.
+ * 1 if the object is of the given class.
+ */
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
+
+/**
+ * Set a value in a configuration object.
+ *
+ * @param env The JNI environment
+ * @param jConfiguration The configuration object to modify
+ * @param key The key to modify
+ * @param value The value to set the key to
+ *
+ * @return NULL on success; exception otherwise
+ */
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+ const char *key, const char *value);
+
#endif /*LIBHDFS_JNI_HELPER_H*/
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
index a1476ca18f0..77e2f0766d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
@@ -17,14 +17,19 @@
*/
#include "exception.h"
+#include "hdfs.h"
+#include "hdfs_test.h"
#include "jni_helper.h"
#include "native_mini_dfs.h"
#include
#include
+#include
#include
#include
#include
+#include
+#include
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
* The NativeMiniDfsCluster object
*/
jobject obj;
+
+ /**
+ * Path to the domain socket, or the empty string if there is none.
+ */
+ char domainSocketPath[PATH_MAX];
};
+static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
+ struct NativeMiniDfsCluster *cl, jobject cobj)
+{
+ jthrowable jthr;
+ char *tmpDir;
+
+ int ret = hdfsDisableDomainSocketSecurity();
+ if (ret) {
+ return newRuntimeError(env, "failed to disable hdfs domain "
+ "socket security: error %d", ret);
+ }
+ jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
+ if (jthr) {
+ return jthr;
+ }
+ tmpDir = getenv("TMPDIR");
+ if (!tmpDir) {
+ tmpDir = "/tmp";
+ }
+ snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+ tmpDir, getpid(), rand());
+ snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
+ tmpDir, getpid(), rand());
+ jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
+ cl->domainSocketPath);
+ if (jthr) {
+ return jthr;
+ }
+ return NULL;
+}
+
struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
{
struct NativeMiniDfsCluster* cl = NULL;
@@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
goto error;
}
}
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: Configuration::setBoolean");
+ goto error;
+ }
+ // Disable 'minimum block size' -- it's annoying in tests.
+ (*env)->DeleteLocalRef(env, jconfStr);
+ jconfStr = NULL;
+ jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: new String");
+ goto error;
+ }
+ jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
+ "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: Configuration::setLong");
+ goto error;
+ }
+ // Creae MiniDFSCluster object
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
"(L"HADOOP_CONF";)V", cobj);
if (jthr) {
@@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
"nmdCreate: NativeMiniDfsCluster#Builder#Builder");
goto error;
}
+ if (conf->configureShortCircuit) {
+ jthr = nmdConfigureShortCircuit(env, cl, cobj);
+ if (jthr) {
+ printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+ "nmdCreate: nmdConfigureShortCircuit error");
+ goto error;
+ }
+ }
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
if (jthr) {
@@ -272,3 +343,29 @@ error_dlr_nn:
return ret;
}
+
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+ struct hdfsBuilder *bld)
+{
+ int port, ret;
+
+ hdfsBuilderSetNameNode(bld, "localhost");
+ port = nmdGetNameNodePort(cl);
+ if (port < 0) {
+ fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
+ return EIO;
+ }
+ hdfsBuilderSetNameNodePort(bld, port);
+ if (cl->domainSocketPath[0]) {
+ ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
+ if (ret) {
+ return ret;
+ }
+ ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
+ cl->domainSocketPath);
+ if (ret) {
+ return ret;
+ }
+ }
+ return 0;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
index 6bf29905ad9..41d69c2966a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
@@ -21,6 +21,7 @@
#include /* for jboolean */
+struct hdfsBuilder;
struct NativeMiniDfsCluster;
/**
@@ -28,17 +29,24 @@ struct NativeMiniDfsCluster;
*/
struct NativeMiniDfsConf {
/**
- * Nonzero if the cluster should be formatted prior to startup
+ * Nonzero if the cluster should be formatted prior to startup.
*/
jboolean doFormat;
+
/**
* Whether or not to enable webhdfs in MiniDfsCluster
*/
jboolean webhdfsEnabled;
+
/**
* The http port of the namenode in MiniDfsCluster
*/
jint namenodeHttpPort;
+
+ /**
+ * Nonzero if we should configure short circuit.
+ */
+ jboolean configureShortCircuit;
};
/**
@@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
*
* @return the port, or a negative error code
*/
-int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
/**
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
@@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName);
+/**
+ * Configure the HDFS builder appropriately to connect to this cluster.
+ *
+ * @param bld The hdfs builder
+ *
+ * @return the port, or a negative error code
+ */
+int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
+ struct hdfsBuilder *bld);
+
#endif
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
new file mode 100644
index 00000000000..0b34540ba95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "expect.h"
+#include "hdfs.h"
+#include "native_mini_dfs.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#define TO_STR_HELPER(X) #X
+#define TO_STR(X) TO_STR_HELPER(X)
+
+#define TEST_FILE_NAME_LENGTH 128
+#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
+#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
+#define TEST_ZEROCOPY_NUM_BLOCKS 6
+#define SMALL_READ_LEN 16
+
+#define ZC_BUF_LEN 32768
+
+static uint8_t *getZeroCopyBlockData(int blockIdx)
+{
+ uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ int i;
+ if (!buf) {
+ fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ exit(1);
+ }
+ for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
+ buf[i] = blockIdx + (i % 17);
+ }
+ return buf;
+}
+
+static int getZeroCopyBlockLen(int blockIdx)
+{
+ if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
+ return 0;
+ } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
+ return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
+ } else {
+ return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
+ }
+}
+
+static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
+
+static void printBuf(const uint8_t *buf, size_t len)
+{
+ size_t i;
+
+ for (i = 0; i < len; i++) {
+ fprintf(stderr, "%02x", buf[i]);
+ }
+ fprintf(stderr, "\n");
+}
+
+static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
+{
+ hdfsFile file = NULL;
+ struct hadoopZeroCopyCursor *zcursor = NULL;
+ uint8_t *backingBuffer = NULL, *block;
+ const void *zcPtr;
+
+ file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
+ EXPECT_NONNULL(file);
+ zcursor = hadoopZeroCopyCursorAlloc(file);
+ EXPECT_NONNULL(zcursor);
+ /* haven't read anything yet */
+ EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
+ block = getZeroCopyBlockData(0);
+ EXPECT_NONNULL(block);
+ /* first read is half of a block. */
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+ hadoopZeroCopyRead(zcursor,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
+ EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ /* read the next half of the block */
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
+ hadoopZeroCopyRead(zcursor,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
+ EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
+ free(block);
+ EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+ /* Now let's read just a few bytes. */
+ EXPECT_INT_EQ(SMALL_READ_LEN,
+ hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr));
+ block = getZeroCopyBlockData(1);
+ EXPECT_NONNULL(block);
+ EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN));
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ hdfsTell(fs, file));
+ EXPECT_ZERO(expectFileStats(file,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+
+ /* Try to read a full block's worth of data. This will cross the block
+ * boundary, which means we have to fall back to non-zero-copy reads.
+ * However, because we don't have a backing buffer, the fallback will fail
+ * with EPROTONOSUPPORT. */
+ EXPECT_INT_EQ(-1,
+ hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+ EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
+
+ /* Now set a backing buffer and try again. It should succeed this time. */
+ backingBuffer = malloc(ZC_BUF_LEN);
+ EXPECT_NONNULL(backingBuffer);
+ EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor,
+ backingBuffer, ZC_BUF_LEN));
+ EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE,
+ hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
+ EXPECT_ZERO(expectFileStats(file,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
+ EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
+ free(block);
+ block = getZeroCopyBlockData(2);
+ EXPECT_NONNULL(block);
+ EXPECT_ZERO(memcmp(block, zcPtr +
+ (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
+ free(block);
+ hadoopZeroCopyCursorFree(zcursor);
+ EXPECT_ZERO(hdfsCloseFile(fs, file));
+ free(backingBuffer);
+ return 0;
+}
+
+static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
+ size_t testFileNameLen)
+{
+ int blockIdx, blockLen;
+ hdfsFile file;
+ uint8_t *data;
+
+ snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
+ getpid(), rand());
+ file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
+ TEST_ZEROCOPY_FULL_BLOCK_SIZE);
+ EXPECT_NONNULL(file);
+ for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
+ blockLen = getZeroCopyBlockLen(blockIdx);
+ data = getZeroCopyBlockData(blockIdx);
+ EXPECT_NONNULL(data);
+ EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
+ }
+ EXPECT_ZERO(hdfsCloseFile(fs, file));
+ return 0;
+}
+
+/**
+ * Test that we can write a file with libhdfs and then read it back
+ */
+int main(void)
+{
+ int port;
+ struct NativeMiniDfsConf conf = {
+ .doFormat = 1,
+ .configureShortCircuit = 1,
+ };
+ char testFileName[TEST_FILE_NAME_LENGTH];
+ hdfsFS fs;
+ struct NativeMiniDfsCluster* cl;
+ struct hdfsBuilder *bld;
+
+ cl = nmdCreate(&conf);
+ EXPECT_NONNULL(cl);
+ EXPECT_ZERO(nmdWaitClusterUp(cl));
+ port = nmdGetNameNodePort(cl);
+ if (port < 0) {
+ fprintf(stderr, "TEST_ERROR: test_zerocopy: "
+ "nmdGetNameNodePort returned error %d\n", port);
+ return EXIT_FAILURE;
+ }
+ bld = hdfsNewBuilder();
+ EXPECT_NONNULL(bld);
+ EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
+ hdfsBuilderSetForceNewInstance(bld);
+ hdfsBuilderConfSetStr(bld, "dfs.block.size",
+ TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
+ /* ensure that we'll always get our mmaps */
+ hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
+ "true");
+ fs = hdfsBuilderConnect(bld);
+ EXPECT_NONNULL(fs);
+ EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
+ TEST_FILE_NAME_LENGTH));
+ EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
+ EXPECT_ZERO(hdfsDisconnect(fs));
+ EXPECT_ZERO(nmdShutdown(cl));
+ nmdFree(cl);
+ fprintf(stderr, "TEST_SUCCESS\n");
+ return EXIT_SUCCESS;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 06eca701264..a2dcf8c347c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1391,4 +1391,32 @@
linearly increases.
+
+
+ dfs.client.mmap.cache.size
+ 1024
+
+ When zero-copy reads are used, the DFSClient keeps a cache of recently used
+ memory mapped regions. This parameter controls the maximum number of
+ entries that we will keep in that cache.
+
+ If this is set to 0, we will not allow mmap.
+
+ The larger this number is, the more file descriptors we will potentially
+ use for memory-mapped files. mmaped files also use virtual address space.
+ You may need to increase your ulimit virtual address space limits before
+ increasing the client mmap cache size.
+
+
+
+
+ dfs.client.mmap.cache.timeout.ms
+ 900000
+
+ The minimum length of time that we will keep an mmap entry in the cache
+ between uses. If an entry is in the cache longer than this, and nobody
+ uses it, it will be removed by a background thread.
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
index 057b79fd114..5015a56a42a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
@@ -23,24 +23,44 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
+import org.apache.commons.lang.SystemUtils;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ZeroCopyCursor;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestBlockReaderLocal {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
+
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
@@ -100,10 +120,11 @@ public class TestBlockReaderLocal {
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+ fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@@ -138,6 +159,7 @@ public class TestBlockReaderLocal {
test.doTest(blockReaderLocal, original);
} finally {
if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close();
@@ -382,10 +404,11 @@ public class TestBlockReaderLocal {
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+ fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@@ -417,8 +440,327 @@ public class TestBlockReaderLocal {
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}
}
+
+ private static byte[] byteBufferToArray(ByteBuffer buf) {
+ byte resultArray[] = new byte[buf.remaining()];
+ buf.get(resultArray);
+ return resultArray;
+ }
+
+ public static HdfsConfiguration initZeroCopyTest() {
+ Assume.assumeTrue(NativeIO.isAvailable());
+ Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ sockDir = new TemporarySocketDirectory();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
+ conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ return conf;
+ }
+
+ @Test
+ public void testZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ ZeroCopyCursor zcursor = null;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ zcursor = fsIn.createZeroCopyCursor();
+ zcursor.setFallbackBuffer(ByteBuffer.
+ allocateDirect(1024 * 1024 * 4));
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ zcursor.read(4096);
+ ByteBuffer result = zcursor.getData();
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ } finally {
+ if (zcursor != null) zcursor.close();
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testShortZeroCopyReads() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ ZeroCopyCursor zcursor = null;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ zcursor = fsIn.createZeroCopyCursor();
+ zcursor.setFallbackBuffer(ByteBuffer.
+ allocateDirect(1024 * 1024 * 4));
+ zcursor.setAllowShortReads(true);
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ zcursor.read(8192);
+ ByteBuffer result = zcursor.getData();
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ zcursor.read(4097);
+ result = zcursor.getData();
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
+ byteBufferToArray(result));
+ zcursor.setAllowShortReads(false);
+ zcursor.read(4100);
+ result = zcursor.getData();
+ Assert.assertEquals(4100, result.remaining());
+
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 8192, 12292),
+ byteBufferToArray(result));
+ } finally {
+ if (zcursor != null) zcursor.close();
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testZeroCopyReadsNoBackingBuffer() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ FSDataInputStream fsIn = null;
+ ZeroCopyCursor zcursor = null;
+
+ FileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ zcursor = fsIn.createZeroCopyCursor();
+ zcursor.setAllowShortReads(false);
+ HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
+ // This read is longer than the file, and we do not have short reads enabled.
+ try {
+ zcursor.read(8192);
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ // This read is longer than the block, and we do not have short reads enabled.
+ try {
+ zcursor.read(4097);
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ // This read should succeed.
+ zcursor.read(4096);
+ ByteBuffer result = zcursor.getData();
+ Assert.assertEquals(4096, result.remaining());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalBytesRead());
+ Assert.assertEquals(4096,
+ dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
+ Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
+ byteBufferToArray(result));
+ } finally {
+ if (zcursor != null) zcursor.close();
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
+ private static class CountingVisitor
+ implements ClientMmapManager.ClientMmapVisitor {
+ int count = 0;
+
+ @Override
+ public void accept(ClientMmap mmap) {
+ count++;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+ }
+
+ @Test
+ public void testZeroCopyMmapCache() throws Exception {
+ HdfsConfiguration conf = initZeroCopyTest();
+ MiniDFSCluster cluster = null;
+ final Path TEST_PATH = new Path("/a");
+ final int TEST_FILE_LENGTH = 16385;
+ final int RANDOM_SEED = 23453;
+ FSDataInputStream fsIn = null;
+ ZeroCopyCursor zcursor[] = { null, null, null, null, null };
+
+ DistributedFileSystem fs = null;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ byte original[] = new byte[TEST_FILE_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
+ fsIn.close();
+ fsIn = fs.open(TEST_PATH);
+ for (int i = 0; i < zcursor.length; i++) {
+ zcursor[i] = fsIn.createZeroCopyCursor();
+ zcursor[i].setAllowShortReads(false);
+ }
+ ClientMmapManager mmapManager = fs.getClient().getMmapManager();
+ CountingVisitor countingVisitor = new CountingVisitor();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ zcursor[0].read(4096);
+ fsIn.seek(0);
+ zcursor[1].read(4096);
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(1, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ countingVisitor.reset();
+
+ // The mmaps should be of the first block of the file.
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
+ @Override
+ public void accept(ClientMmap mmap) {
+ Assert.assertEquals(firstBlock, mmap.getBlock());
+ }
+ });
+
+ // Read more blocks.
+ zcursor[2].read(4096);
+ zcursor[3].read(4096);
+ try {
+ zcursor[4].read(4096);
+ Assert.fail("expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+
+ // we should have 3 mmaps, 0 evictable
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(3, countingVisitor.count);
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+
+ // After we close the cursors, the mmaps should be evictable for
+ // a brief period of time. Then, they should be closed (we're
+ // using a very quick timeout)
+ for (int i = 0; i < zcursor.length; i++) {
+ IOUtils.closeStream(zcursor[i]);
+ }
+ while (true) {
+ countingVisitor.reset();
+ mmapManager.visitEvictable(countingVisitor);
+ if (0 == countingVisitor.count) {
+ break;
+ }
+ }
+ countingVisitor.reset();
+ mmapManager.visitMmaps(countingVisitor);
+ Assert.assertEquals(0, countingVisitor.count);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (fs != null) fs.close();
+ if (cluster != null) cluster.shutdown();
+ }
+
+ }
}