diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
index 9f93924b5be..b1f8749f581 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
@@ -111,7 +111,7 @@ public class DomainSocket implements Closeable {
* Disable validation of the server bind paths.
*/
@VisibleForTesting
- static void disableBindPathValidation() {
+ public static void disableBindPathValidation() {
validateBindPaths = false;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 4a3424bad32..5eb51670829 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -104,7 +104,7 @@ public class DataChecksum implements Checksum {
( (bytes[offset+2] & 0xff) << 16 ) |
( (bytes[offset+3] & 0xff) << 8 ) |
( (bytes[offset+4] & 0xff) );
- return newDataChecksum( Type.valueOf(bytes[0]), bytesPerChecksum );
+ return newDataChecksum( Type.valueOf(bytes[offset]), bytesPerChecksum );
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
index 337cbec292e..af90e837028 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-347.txt
@@ -7,3 +7,6 @@ HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
(Colin Patrick McCabe via todd)
+
+HDFS-4356. BlockReaderLocal should use passed file descriptors rather than paths.
+(Colin Patrick McCabe via todd)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index c019b10e423..4eaa65c8e4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -290,6 +290,14 @@
+
+
+
+
+
+
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 2bbae525898..17ecb9e30d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -41,18 +41,29 @@ public interface BlockReader extends ByteBufferReadable {
*/
long skip(long n) throws IOException;
+ /**
+ * Returns an estimate of the number of bytes that can be read
+ * (or skipped over) from this input stream without performing
+ * network I/O.
+ */
+ int available() throws IOException;
+
/**
* Close the block reader.
*
* @param peerCache The PeerCache to put the Peer we're using back
* into, or null if we should simply close the Peer
* we're using (along with its Socket).
- * Some block readers, like BlockReaderLocal, may
- * not make use of this parameter.
+ * Ignored by Readers that don't maintain Peers.
+ * @param fisCache The FileInputStreamCache to put our FileInputStreams
+ * back into, or null if we should simply close them.
+ * Ignored by Readers that don't maintain
+ * FileInputStreams.
*
* @throws IOException
*/
- void close(PeerCache peerCache) throws IOException;
+ void close(PeerCache peerCache, FileInputStreamCache fisCache)
+ throws IOException;
/**
* Read exactly the given amount of data, throwing an exception
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index d6d39304966..95f7b94e0e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -17,22 +17,26 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSClient.Conf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
@@ -58,6 +62,12 @@ public class BlockReaderFactory {
* @param clientName Client name. Used for log messages.
* @param peer The peer
* @param datanodeID The datanode that the Peer is connected to
+ * @param domainSocketFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param allowShortCircuitLocalReads True if short-circuit local reads
+ * should be allowed.
* @return New BlockReader instance, or null on error.
*/
@SuppressWarnings("deprecation")
@@ -70,11 +80,44 @@ public class BlockReaderFactory {
boolean verifyChecksum,
String clientName,
Peer peer,
- DatanodeID datanodeID)
- throws IOException {
+ DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory,
+ boolean allowShortCircuitLocalReads)
+ throws IOException {
peer.setReadTimeout(conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT));
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
+
+ if (peer.getDomainSocket() != null) {
+ if (allowShortCircuitLocalReads) {
+ // If this is a domain socket, and short-circuit local reads are
+ // enabled, try to set up a BlockReaderLocal.
+ BlockReader reader = newShortCircuitBlockReader(conf, file,
+ block, blockToken, startOffset, len, peer, datanodeID,
+ domSockFactory, verifyChecksum);
+ if (reader != null) {
+ // One we've constructed the short-circuit block reader, we don't
+ // need the socket any more. So let's return it to the cache.
+ PeerCache peerCache = PeerCache.getInstance(
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT),
+ conf.getLong(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT));
+ peerCache.put(datanodeID, peer);
+ return reader;
+ }
+ }
+ // If this is a domain socket and we couldn't (or didn't want to) set
+ // up a BlockReaderLocal, check that we are allowed to pass data traffic
+ // over the socket before proceeding.
+ if (!conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {
+ throw new IOException("Because we can't do short-circuit access, " +
+ "and data traffic over domain sockets is disabled, " +
+ "we cannot use this socket to talk to " + datanodeID);
+ }
+ }
+
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT)) {
return RemoteBlockReader.newBlockReader(file,
@@ -88,7 +131,94 @@ public class BlockReaderFactory {
verifyChecksum, clientName, peer, datanodeID);
}
}
-
+
+ /**
+ * Create a new short-circuit BlockReader.
+ *
+ * Here, we ask the DataNode to pass us file descriptors over our
+ * DomainSocket. If the DataNode declines to do so, we'll return null here;
+ * otherwise, we'll return the BlockReaderLocal. If the DataNode declines,
+ * this function will inform the DomainSocketFactory that short-circuit local
+ * reads are disabled for this DataNode, so that we don't ask again.
+ *
+ * @param conf the configuration.
+ * @param file the file name. Used in log messages.
+ * @param block The block object.
+ * @param blockToken The block token for security.
+ * @param startOffset The read offset, relative to block head.
+ * @param len The number of bytes to read, or -1 to read
+ * as many as possible.
+ * @param peer The peer to use.
+ * @param datanodeID The datanode that the Peer is connected to.
+ * @param domSockFactory The DomainSocketFactory to notify if the Peer
+ * is a DomainPeer which turns out to be faulty.
+ * If null, no factory will be notified in this
+ * case.
+ * @param verifyChecksum True if we should verify the checksums.
+ * Note: even if this is true, when
+ * DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
+ * set, we will skip checksums.
+ *
+ * @return The BlockReaderLocal, or null if the
+ * DataNode declined to provide short-circuit
+ * access.
+ * @throws IOException If there was a communication error.
+ */
+ private static BlockReaderLocal newShortCircuitBlockReader(
+ Configuration conf, String file, ExtendedBlock block,
+ Token blockToken, long startOffset,
+ long len, Peer peer, DatanodeID datanodeID,
+ DomainSocketFactory domSockFactory, boolean verifyChecksum)
+ throws IOException {
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(
+ peer.getOutputStream()));
+ new Sender(out).requestShortCircuitFds(block, blockToken, 1);
+ DataInputStream in =
+ new DataInputStream(peer.getInputStream());
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
+ DomainSocket sock = peer.getDomainSocket();
+ switch (resp.getStatus()) {
+ case SUCCESS:
+ BlockReaderLocal reader = null;
+ byte buf[] = new byte[1];
+ FileInputStream fis[] = new FileInputStream[2];
+ sock.recvFileInputStreams(fis, buf, 0, buf.length);
+ try {
+ reader = new BlockReaderLocal(conf, file, block,
+ startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum);
+ } finally {
+ if (reader == null) {
+ IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
+ }
+ }
+ return reader;
+ case ERROR_UNSUPPORTED:
+ if (!resp.hasShortCircuitAccessVersion()) {
+ DFSClient.LOG.warn("short-circuit read access is disabled for " +
+ "DataNode " + datanodeID + ". reason: " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ } else {
+ DFSClient.LOG.warn("short-circuit read access for the file " +
+ file + " is disabled for DataNode " + datanodeID +
+ ". reason: " + resp.getMessage());
+ }
+ return null;
+ case ERROR_ACCESS_TOKEN:
+ String msg = "access control error while " +
+ "attempting to set up short-circuit access to " +
+ file + resp.getMessage();
+ DFSClient.LOG.debug(msg);
+ throw new InvalidBlockTokenException(msg);
+ default:
+ DFSClient.LOG.warn("error while attempting to set up short-circuit " +
+ "access to " + file + ": " + resp.getMessage());
+ domSockFactory.disableShortCircuitForPath(sock.getPath());
+ return null;
+ }
+ }
+
/**
* File name to print when accessing a block directly (from servlets)
* @param s Address of the block location
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index eacd902aa2b..1c34a71c26d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -18,30 +18,18 @@
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
-import java.io.File;
+import org.apache.hadoop.conf.Configuration;
import java.io.FileInputStream;
import java.io.IOException;
-import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
@@ -53,74 +41,19 @@ import org.apache.hadoop.util.DataChecksum;
*
* - The client performing short circuit reads must be configured at the
* datanode.
- * - The client gets the path to the file where block is stored using
- * {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
- * RPC call
- * - Client uses kerberos authentication to connect to the datanode over RPC,
- * if security is enabled.
+ * - The client gets the file descriptors for the metadata file and the data
+ * file for the block using
+ * {@link org.apache.hadoop.hdfs.server.datanode.DataXceiver#requestShortCircuitFds}.
+ *
+ * - The client reads the file descriptors.
*
*/
class BlockReaderLocal implements BlockReader {
- private static final Log LOG = LogFactory.getLog(DFSClient.class);
-
- //Stores the cache and proxy for a local datanode.
- private static class LocalDatanodeInfo {
- private ClientDatanodeProtocol proxy = null;
- private final Map cache;
-
- LocalDatanodeInfo() {
- final int cacheSize = 10000;
- final float hashTableLoadFactor = 0.75f;
- int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
- cache = Collections
- .synchronizedMap(new LinkedHashMap(
- hashTableCapacity, hashTableLoadFactor, true) {
- private static final long serialVersionUID = 1;
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry eldest) {
- return size() > cacheSize;
- }
- });
- }
-
- private synchronized ClientDatanodeProtocol getDatanodeProxy(
- DatanodeInfo node, Configuration conf, int socketTimeout,
- boolean connectToDnViaHostname) throws IOException {
- if (proxy == null) {
- proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
- socketTimeout, connectToDnViaHostname);
- }
- return proxy;
- }
-
- private synchronized void resetDatanodeProxy() {
- if (null != proxy) {
- RPC.stopProxy(proxy);
- proxy = null;
- }
- }
-
- private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
- return cache.get(b);
- }
-
- private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
- cache.put(b, info);
- }
-
- private void removeBlockLocalPathInfo(ExtendedBlock b) {
- cache.remove(b);
- }
- }
-
- // Multiple datanodes could be running on the local machine. Store proxies in
- // a map keyed by the ipc port of the datanode.
- private static Map localDatanodeInfoMap = new HashMap();
+ static final Log LOG = LogFactory.getLog(DFSClient.class);
private final FileInputStream dataIn; // reader for the data file
private final FileInputStream checksumIn; // reader for the checksum file
+ private final boolean verifyChecksum;
/**
* Offset from the most recent chunk boundary at which the next read should
@@ -140,7 +73,6 @@ class BlockReaderLocal implements BlockReader {
private ByteBuffer slowReadBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
- private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
@@ -150,186 +82,90 @@ class BlockReaderLocal implements BlockReader {
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
+
+ private final DatanodeID datanodeID;
+ private final ExtendedBlock block;
- /**
- * The only way this object can be instantiated.
- */
- static BlockReaderLocal newBlockReader(Configuration conf, String file,
- ExtendedBlock blk, Token token, DatanodeInfo node,
- int socketTimeout, long startOffset, long length,
- boolean connectToDnViaHostname) throws IOException {
+ private static int getSlowReadBufferNumChunks(Configuration conf,
+ int bytesPerChecksum) {
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
- .getIpcPort());
- // check the cache first
- BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
- if (pathinfo == null) {
- pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
- connectToDnViaHostname);
- }
+ int bufSize =
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
- // check to see if the file exists. It may so happen that the
- // HDFS file has been deleted and this block-lookup is occurring
- // on behalf of a new HDFS file. This time, the block file could
- // be residing in a different portion of the fs.data.dir directory.
- // In this case, we remove this entry from the cache. The next
- // call to this method will re-populate the cache.
- FileInputStream dataIn = null;
- FileInputStream checksumIn = null;
- BlockReaderLocal localBlockReader = null;
- boolean skipChecksumCheck = skipChecksumCheck(conf);
- try {
- // get a local file system
- File blkfile = new File(pathinfo.getBlockPath());
- dataIn = new FileInputStream(blkfile);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
- + blkfile.length() + " startOffset " + startOffset + " length "
- + length + " short circuit checksum " + !skipChecksumCheck);
- }
-
- if (!skipChecksumCheck) {
- // get the metadata file
- File metafile = new File(pathinfo.getMetaPath());
- checksumIn = new FileInputStream(metafile);
-
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(new DataInputStream(checksumIn));
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + blk + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
- long firstChunkOffset = startOffset
- - (startOffset % checksum.getBytesPerChecksum());
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, checksum, true, dataIn,
- firstChunkOffset, checksumIn);
- } else {
- localBlockReader = new BlockReaderLocal(conf, file, blk, token,
- startOffset, length, pathinfo, dataIn);
- }
- } catch (IOException e) {
- // remove from cache
- localDatanodeInfo.removeBlockLocalPathInfo(blk);
- DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
- + " from cache because local file " + pathinfo.getBlockPath()
- + " could not be opened.");
- throw e;
- } finally {
- if (localBlockReader == null) {
- if (dataIn != null) {
- dataIn.close();
- }
- if (checksumIn != null) {
- checksumIn.close();
- }
- }
- }
- return localBlockReader;
- }
-
- private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
- LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
- if (ldInfo == null) {
- ldInfo = new LocalDatanodeInfo();
- localDatanodeInfoMap.put(port, ldInfo);
- }
- return ldInfo;
- }
-
- private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
- DatanodeInfo node, Configuration conf, int timeout,
- Token token, boolean connectToDnViaHostname)
- throws IOException {
- LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
- BlockLocalPathInfo pathinfo = null;
- ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
- conf, timeout, connectToDnViaHostname);
- try {
- // make RPC to local datanode to find local pathnames of blocks
- pathinfo = proxy.getBlockLocalPathInfo(blk, token);
- if (pathinfo != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cached location of block " + blk + " as " + pathinfo);
- }
- localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
- }
- } catch (IOException e) {
- localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
- throw e;
- }
- return pathinfo;
- }
-
- private static boolean skipChecksumCheck(Configuration conf) {
- return conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
- }
-
- private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
- int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
-
- if (bufferSizeBytes < bytesPerChecksum) {
- throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
- "is not large enough to hold a single chunk (" + bytesPerChecksum + "). Please configure " +
+ if (bufSize < bytesPerChecksum) {
+ throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
+ bufSize + ") is not large enough to hold a single chunk (" +
+ bytesPerChecksum + "). Please configure " +
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
}
// Round down to nearest chunk size
- return bufferSizeBytes / bytesPerChecksum;
+ return bufSize / bytesPerChecksum;
}
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
- throws IOException {
- this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
- DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 4), false,
- dataIn, startOffset, null);
- }
-
- private BlockReaderLocal(Configuration conf, String hdfsfile,
- ExtendedBlock block, Token token, long startOffset,
- long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
- boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
- FileInputStream checksumIn) throws IOException {
- this.filename = hdfsfile;
- this.checksum = checksum;
- this.verifyChecksum = verifyChecksum;
- this.startOffset = Math.max(startOffset, 0);
-
- bytesPerChecksum = this.checksum.getBytesPerChecksum();
- checksumSize = this.checksum.getChecksumSize();
-
+ public BlockReaderLocal(Configuration conf, String filename,
+ ExtendedBlock block, long startOffset, long length,
+ FileInputStream dataIn, FileInputStream checksumIn,
+ DatanodeID datanodeID, boolean verifyChecksum) throws IOException {
this.dataIn = dataIn;
this.checksumIn = checksumIn;
- this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
+ this.startOffset = Math.max(startOffset, 0);
+ this.filename = filename;
+ this.datanodeID = datanodeID;
+ this.block = block;
- int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
- slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
- checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
- // Initially the buffers have nothing to read.
- slowReadBuff.flip();
- checksumBuff.flip();
+ // read and handle the common header here. For now just a version
+ checksumIn.getChannel().position(0);
+ BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(new DataInputStream(checksumIn));
+ short version = header.getVersion();
+ if (version != BlockMetadataHeader.VERSION) {
+ throw new IOException("Wrong version (" + version + ") of the " +
+ "metadata file for " + filename + ".");
+ }
+ if (!verifyChecksum) {
+ this.verifyChecksum = false;
+ } else {
+ this.verifyChecksum = !conf.getBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ }
+ long firstChunkOffset;
+ if (this.verifyChecksum) {
+ this.checksum = header.getChecksum();
+ this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ this.checksumSize = this.checksum.getChecksumSize();
+ firstChunkOffset = startOffset
+ - (startOffset % checksum.getBytesPerChecksum());
+ this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
+
+ int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
+ slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
+ checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
+ // Initially the buffers have nothing to read.
+ slowReadBuff.flip();
+ checksumBuff.flip();
+ long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
+ IOUtils.skipFully(checksumIn, checkSumOffset);
+ } else {
+ firstChunkOffset = startOffset;
+ this.checksum = null;
+ this.bytesPerChecksum = 0;
+ this.checksumSize = 0;
+ this.offsetFromChunkBoundary = 0;
+ }
+
boolean success = false;
try {
- // Skip both input streams to beginning of the chunk containing startOffset
- IOUtils.skipFully(dataIn, firstChunkOffset);
- if (checksumIn != null) {
- long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
- IOUtils.skipFully(checksumIn, checkSumOffset);
- }
+ // Reposition both input streams to the beginning of the chunk
+ // containing startOffset
+ this.dataIn.getChannel().position(firstChunkOffset);
success = true;
} finally {
if (!success) {
- bufferPool.returnBuffer(slowReadBuff);
- bufferPool.returnBuffer(checksumBuff);
+ if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
+ if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
}
}
}
@@ -649,9 +485,17 @@ class BlockReaderLocal implements BlockReader {
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
- dataIn.close();
- if (checksumIn != null) {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
+ if (fisCache != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putting FileInputStream for " + filename +
+ " back into FileInputStreamCache");
+ }
+ fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+ } else {
+ LOG.debug("closing FileInputStream for " + filename);
+ dataIn.close();
checksumIn.close();
}
if (slowReadBuff != null) {
@@ -675,4 +519,10 @@ class BlockReaderLocal implements BlockReader {
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
+
+ @Override
+ public int available() throws IOException {
+ // We never do network I/O in BlockReaderLocal.
+ return Integer.MAX_VALUE;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 8230b992cff..73230671570 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -128,7 +128,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -227,6 +226,11 @@ public class DFSClient implements java.io.Closeable {
final boolean getHdfsBlocksMetadataEnabled;
final int getFileBlockStorageLocationsNumThreads;
final int getFileBlockStorageLocationsTimeout;
+ final String domainSocketPath;
+ final boolean skipShortCircuitChecksums;
+ final int shortCircuitBufferSize;
+ final boolean shortCircuitLocalReads;
+ final boolean domainSocketDataTraffic;
Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt(
@@ -288,6 +292,19 @@ public class DFSClient implements java.io.Closeable {
getFileBlockStorageLocationsTimeout = conf.getInt(
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+ domainSocketPath = conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ skipShortCircuitChecksums = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
+ shortCircuitBufferSize = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
+ shortCircuitLocalReads = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
+ domainSocketDataTraffic = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
+ DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -345,7 +362,7 @@ public class DFSClient implements java.io.Closeable {
private final Map filesBeingWritten
= new HashMap();
- private boolean shortCircuitLocalReads;
+ private final DomainSocketFactory domainSocketFactory;
/**
* Same as this(NameNode.getAddress(conf), conf);
@@ -417,12 +434,8 @@ public class DFSClient implements java.io.Closeable {
}
// read directly from the block file if configured.
- this.shortCircuitLocalReads = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
- DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Short circuit read is " + shortCircuitLocalReads);
- }
+ this.domainSocketFactory = new DomainSocketFactory(dfsClientConf);
+
String localInterfaces[] =
conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
@@ -787,28 +800,11 @@ public class DFSClient implements java.io.Closeable {
AccessControlException.class);
}
}
-
- /**
- * Get {@link BlockReader} for short circuited local reads.
- */
- static BlockReader getLocalBlockReader(Configuration conf,
- String src, ExtendedBlock blk, Token accessToken,
- DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
- boolean connectToDnViaHostname) throws InvalidToken, IOException {
- try {
- return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
- chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- - offsetIntoBlock, connectToDnViaHostname);
- } catch (RemoteException re) {
- throw re.unwrapRemoteException(InvalidToken.class,
- AccessControlException.class);
- }
- }
private static Map localAddrMap = Collections
.synchronizedMap(new HashMap());
- private static boolean isLocalAddress(InetSocketAddress targetAddr) {
+ static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null) {
@@ -2108,10 +2104,6 @@ public class DFSClient implements java.io.Closeable {
super(in);
}
}
-
- boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
- return shortCircuitLocalReads && isLocalAddress(targetAddr);
- }
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
DatanodeInfo [] dnArr = { dn };
@@ -2135,7 +2127,7 @@ public class DFSClient implements java.io.Closeable {
+ ", ugi=" + ugi + "]";
}
- void disableShortCircuit() {
- shortCircuitLocalReads = false;
+ public DomainSocketFactory getDomainSocketFactory() {
+ return domainSocketFactory;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c62e9f7fc26..d0b29b71aa8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -342,7 +342,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY = "dfs.client.read.shortcircuit.streams.cache.size";
+ public static final int DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT = 10;
+ public static final String DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY = "dfs.client.read.shortcircuit.streams.cache.expiry.ms";
+ public static final long DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT = 60000;
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
+ public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
+ public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -393,6 +399,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
+ public static final String DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY = "dfs.datanode.domain.socket.path";
// HA related configuration
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index aec1726a3ab..a33f8f54e7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -38,7 +39,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.net.EncryptedPeer;
+import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@@ -46,17 +47,16 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.hdfs.FileInputStreamCache;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
@@ -80,6 +80,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private long pos = 0;
private long blockEnd = -1;
+ private final FileInputStreamCache fileInputStreamCache;
+
/**
* This variable tracks the number of failures since the start of the
* most recent user-facing operation. That is to say, it should be reset
@@ -115,6 +117,13 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
this.buffersize = buffersize;
this.src = src;
this.peerCache = dfsClient.peerCache;
+ this.fileInputStreamCache = new FileInputStreamCache(
+ dfsClient.conf.getInt(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT),
+ dfsClient.conf.getLong(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT));
prefetchSize = dfsClient.getConf().prefetchSize;
timeWindow = dfsClient.getConf().timeWindow;
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry;
@@ -247,7 +256,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
}
- private synchronized boolean blockUnderConstruction() {
+ // Short circuit local reads are forbidden for files that are
+ // under construction. See HDFS-2757.
+ synchronized boolean shortCircuitForbidden() {
return locatedBlocks.isUnderConstruction();
}
@@ -428,7 +439,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Will be getting a new BlockReader.
if (blockReader != null) {
- blockReader.close(peerCache);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
@@ -510,10 +521,11 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.checkOpen();
if (blockReader != null) {
- blockReader.close(peerCache);
+ blockReader.close(peerCache, fileInputStreamCache);
blockReader = null;
}
super.close();
+ fileInputStreamCache.close();
closed = true;
}
@@ -809,10 +821,6 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
e.getPos() + " from " + chosenNode);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
- } catch (AccessControlException ex) {
- DFSClient.LOG.warn("Short circuit access failed ", ex);
- dfsClient.disableShortCircuit();
- continue;
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
@@ -837,7 +845,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
}
} finally {
if (reader != null) {
- reader.close(peerCache);
+ reader.close(peerCache, fileInputStreamCache);
}
}
// Put chosen node into dead list, continue
@@ -849,19 +857,29 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
Peer peer = null;
boolean success = false;
Socket sock = null;
+ DomainSocket domSock = null;
+
try {
- sock = dfsClient.socketFactory.createSocket();
- NetUtils.connect(sock, addr,
- dfsClient.getRandomLocalInterfaceAddr(),
- dfsClient.getConf().socketTimeout);
- peer = TcpPeerServer.peerFromSocketAndKey(sock,
- dfsClient.getDataEncryptionKey());
+ domSock = dfsClient.getDomainSocketFactory().create(addr, this);
+ if (domSock != null) {
+ // Create a UNIX Domain peer.
+ peer = new DomainPeer(domSock);
+ } else {
+ // Create a conventional TCP-based Peer.
+ sock = dfsClient.socketFactory.createSocket();
+ NetUtils.connect(sock, addr,
+ dfsClient.getRandomLocalInterfaceAddr(),
+ dfsClient.getConf().socketTimeout);
+ peer = TcpPeerServer.peerFromSocketAndKey(sock,
+ dfsClient.getDataEncryptionKey());
+ }
success = true;
return peer;
} finally {
if (!success) {
IOUtils.closeQuietly(peer);
IOUtils.closeQuietly(sock);
+ IOUtils.closeQuietly(domSock);
}
}
}
@@ -895,49 +913,77 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
String clientName)
throws IOException {
- // Can't local read a block under construction, see HDFS-2757
- if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
- !blockUnderConstruction()) {
- return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
- blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
- dfsClient.connectToDnViaHostname());
- }
-
IOException err = null;
- boolean fromCache = true;
- // Allow retry since there is no way of knowing whether the cached socket
- // is good until we actually use it.
- for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+ // Firstly, we check to see if we have cached any file descriptors for
+ // local blocks. If so, we can just re-use those file descriptors.
+ FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block);
+ if (fis != null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
+ "the FileInputStreamCache.");
+ }
+ return new BlockReaderLocal(dfsClient.conf, file,
+ block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum);
+ }
+
+ // We retry several times here.
+ // On the first nCachedConnRetry times, we try to fetch a socket from
+ // the socketCache and use it. This may fail, since the old socket may
+ // have been closed by the peer.
+ // After that, we try to create a new socket using newPeer().
+ // This may create either a TCP socket or a UNIX domain socket, depending
+ // on the configuration and whether the peer is remote.
+ // If we try to create a UNIX domain socket and fail, we will not try that
+ // again. Instead, we'll try to create a TCP socket. Only after we've
+ // failed to create a TCP-based BlockReader will we throw an IOException
+ // from this function. Throwing an IOException from here is basically
+ // equivalent to declaring the DataNode bad.
+ boolean triedNonDomainSocketReader = false;
+ for (int retries = 0;
+ retries < nCachedConnRetry && (!triedNonDomainSocketReader);
+ ++retries) {
Peer peer = null;
- // Don't use the cache on the last attempt - it's possible that there
- // are arbitrarily many unusable sockets in the cache, but we don't
- // want to fail the read.
if (retries < nCachedConnRetry) {
peer = peerCache.get(chosenNode);
}
if (peer == null) {
peer = newPeer(dnAddr);
- fromCache = false;
+ if (peer.getDomainSocket() == null) {
+ triedNonDomainSocketReader = true;
+ }
}
-
+ boolean success = false;
try {
- // The OP_READ_BLOCK request is sent as we make the BlockReader
- BlockReader reader =
- BlockReaderFactory.newBlockReader(dfsClient.conf,
- file, block,
- blockToken,
- startOffset, len,
- verifyChecksum,
- clientName,
- peer,
- chosenNode);
- return reader;
- } catch (IOException ex) {
- // Our socket is no good.
- DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
- IOUtils.closeQuietly(peer);
+ boolean allowShortCircuitLocalReads =
+ (peer.getDomainSocket() != null) &&
+ dfsClient.getConf().shortCircuitLocalReads &&
+ (!shortCircuitForbidden());
+ // Here we will try to send either an OP_READ_BLOCK request or an
+ // OP_REQUEST_SHORT_CIRCUIT_FDS, depending on what kind of block reader
+ // we're trying to create.
+ BlockReader blockReader = BlockReaderFactory.newBlockReader(
+ dfsClient.conf, file, block, blockToken, startOffset,
+ len, verifyChecksum, clientName, peer, chosenNode,
+ dfsClient.getDomainSocketFactory(), allowShortCircuitLocalReads);
+ success = true;
+ return blockReader;
+ } catch (IOException ex) {
+ // Our socket is no good.
+ DFSClient.LOG.debug("Error making BlockReader. " +
+ "Closing stale " + peer, ex);
+ if (peer.getDomainSocket() != null) {
+ // If the Peer that we got the error from was a DomainPeer,
+ // mark the socket path as bad, so that newDataSocket will not try
+ // to re-open this socket for a while.
+ dfsClient.getDomainSocketFactory().
+ disableDomainSocketPath(peer.getDomainSocket().getPath());
+ }
err = ex;
+ } finally {
+ if (!success) {
+ IOUtils.closeQuietly(peer);
+ }
}
}
@@ -1075,7 +1121,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// the TCP buffer, then just eat up the intervening data.
//
int diff = (int)(targetPos - pos);
- if (diff <= DFSClient.TCP_WINDOW_SIZE) {
+ if (diff <= blockReader.available()) {
try {
pos += blockReader.skip(diff);
if (pos == targetPos) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
new file mode 100644
index 00000000000..50b60521070
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DomainSocketFactory.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+class DomainSocketFactory {
+ public static final Log LOG = LogFactory.getLog(DomainSocketFactory.class);
+ private final Conf conf;
+
+ enum PathStatus {
+ UNUSABLE,
+ SHORT_CIRCUIT_DISABLED,
+ }
+
+ /**
+ * Information about domain socket paths.
+ */
+ Cache pathInfo =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
+
+ public DomainSocketFactory(Conf conf) {
+ this.conf = conf;
+
+ String feature = null;
+ if (conf.shortCircuitLocalReads) {
+ feature = "The short-circuit local reads feature";
+ } else if (conf.domainSocketDataTraffic) {
+ feature = "UNIX domain socket data traffic";
+ }
+ if (feature != null) {
+ if (conf.domainSocketPath == null) {
+ LOG.warn(feature + " is disabled because you have not set " +
+ DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ } else if (DomainSocket.getLoadingFailureReason() != null) {
+ LOG.error(feature + " is disabled because " +
+ DomainSocket.getLoadingFailureReason());
+ } else {
+ LOG.debug(feature + "is enabled.");
+ }
+ }
+ }
+
+ /**
+ * Create a DomainSocket.
+ *
+ * @param addr The address of the DataNode
+ * @param stream The DFSInputStream the socket will be created for.
+ *
+ * @return null if the socket could not be created; the
+ * socket otherwise. If there was an error while
+ * creating the socket, we will add the socket path
+ * to our list of failed domain socket paths.
+ */
+ DomainSocket create(InetSocketAddress addr, DFSInputStream stream) {
+ // If there is no domain socket path configured, we can't use domain
+ // sockets.
+ if (conf.domainSocketPath == null) return null;
+ // UNIX domain sockets can only be used to talk to local peers
+ if (!DFSClient.isLocalAddress(addr)) return null;
+ // If the DomainSocket code is not loaded, we can't create
+ // DomainSocket objects.
+ if (DomainSocket.getLoadingFailureReason() != null) return null;
+ String escapedPath = DomainSocket.
+ getEffectivePath(conf.domainSocketPath, addr.getPort());
+ PathStatus info = pathInfo.getIfPresent(escapedPath);
+ if (info == PathStatus.UNUSABLE) {
+ // We tried to connect to this domain socket before, and it was totally
+ // unusable.
+ return null;
+ }
+ if ((!conf.domainSocketDataTraffic) &&
+ ((info == PathStatus.SHORT_CIRCUIT_DISABLED) ||
+ stream.shortCircuitForbidden())) {
+ // If we don't want to pass data over domain sockets, and we don't want
+ // to pass file descriptors over them either, we have no use for domain
+ // sockets.
+ return null;
+ }
+ boolean success = false;
+ DomainSocket sock = null;
+ try {
+ sock = DomainSocket.connect(escapedPath);
+ sock.setAttribute(DomainSocket.RCV_TIMEO, conf.socketTimeout);
+ success = true;
+ } catch (IOException e) {
+ LOG.error("error creating DomainSocket", e);
+ // fall through
+ } finally {
+ if (!success) {
+ if (sock != null) {
+ IOUtils.closeQuietly(sock);
+ }
+ pathInfo.put(escapedPath, PathStatus.UNUSABLE);
+ sock = null;
+ }
+ }
+ return sock;
+ }
+
+ public void disableShortCircuitForPath(String path) {
+ pathInfo.put(path, PathStatus.SHORT_CIRCUIT_DISABLED);
+ }
+
+ public void disableDomainSocketPath(String path) {
+ pathInfo.put(path, PathStatus.UNUSABLE);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
new file mode 100644
index 00000000000..d9045f0cf2c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/FileInputStreamCache.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * FileInputStream cache is used to cache FileInputStream objects that we
+ * have received from the DataNode.
+ */
+class FileInputStreamCache {
+ private final static Log LOG = LogFactory.getLog(FileInputStreamCache.class);
+
+ /**
+ * The executor service that runs the cacheCleaner. There is only one of
+ * these per VM.
+ */
+ private final static ScheduledThreadPoolExecutor executor
+ = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
+ setDaemon(true).setNameFormat("FileInputStreamCache Cleaner").
+ build());
+
+ /**
+ * The CacheCleaner for this FileInputStreamCache. We don't create this
+ * and schedule it until it becomes necessary.
+ */
+ private CacheCleaner cacheCleaner;
+
+ /**
+ * Maximum number of entries to allow in the cache.
+ */
+ private final int maxCacheSize;
+
+ /**
+ * The minimum time in milliseconds to preserve an element in the cache.
+ */
+ private final long expiryTimeMs;
+
+ /**
+ * True if the FileInputStreamCache is closed.
+ */
+ private boolean closed = false;
+
+ /**
+ * Cache entries.
+ */
+ private final LinkedListMultimap map = LinkedListMultimap.create();
+
+ /**
+ * Expiry thread which makes sure that the file descriptors get closed
+ * after a while.
+ */
+ class CacheCleaner implements Runnable {
+ @Override
+ public void run() {
+ synchronized(FileInputStreamCache.this) {
+ if (closed) return;
+ long curTime = Time.monotonicNow();
+ for (Iterator> iter = map.entries().iterator();
+ iter.hasNext();
+ iter = map.entries().iterator()) {
+ Entry entry = iter.next();
+ if (entry.getValue().getTime() + expiryTimeMs >= curTime) {
+ break;
+ }
+ entry.getValue().close();
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * The key identifying a FileInputStream array.
+ */
+ static class Key {
+ private final DatanodeID datanodeID;
+ private final ExtendedBlock block;
+
+ public Key(DatanodeID datanodeID, ExtendedBlock block) {
+ this.datanodeID = datanodeID;
+ this.block = block;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof FileInputStreamCache.Key)) {
+ return false;
+ }
+ FileInputStreamCache.Key otherKey = (FileInputStreamCache.Key)other;
+ return (block.equals(otherKey.block) &
+ (block.getGenerationStamp() == otherKey.block.getGenerationStamp()) &
+ datanodeID.equals(otherKey.datanodeID));
+ }
+
+ @Override
+ public int hashCode() {
+ return block.hashCode();
+ }
+ }
+
+ /**
+ * The value containing a FileInputStream array and the time it was added to
+ * the cache.
+ */
+ static class Value {
+ private final FileInputStream fis[];
+ private final long time;
+
+ public Value (FileInputStream fis[]) {
+ this.fis = fis;
+ this.time = Time.monotonicNow();
+ }
+
+ public FileInputStream[] getFileInputStreams() {
+ return fis;
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+
+ /**
+ * Create a new FileInputStream
+ *
+ * @param maxCacheSize The maximum number of elements to allow in
+ * the cache.
+ * @param expiryTimeMs The minimum time in milliseconds to preserve
+ * elements in the cache.
+ */
+ public FileInputStreamCache(int maxCacheSize, long expiryTimeMs) {
+ this.maxCacheSize = maxCacheSize;
+ this.expiryTimeMs = expiryTimeMs;
+ }
+
+ /**
+ * Put an array of FileInputStream objects into the cache.
+ *
+ * @param datanodeID The DatanodeID to store the streams under.
+ * @param block The Block to store the streams under.
+ * @param fis The streams.
+ */
+ public void put(DatanodeID datanodeID, ExtendedBlock block,
+ FileInputStream fis[]) {
+ boolean inserted = false;
+ try {
+ synchronized(this) {
+ if (closed) return;
+ if (map.size() + 1 > maxCacheSize) {
+ Iterator> iter = map.entries().iterator();
+ if (!iter.hasNext()) return;
+ Entry entry = iter.next();
+ entry.getValue().close();
+ iter.remove();
+ }
+ if (cacheCleaner == null) {
+ cacheCleaner = new CacheCleaner();
+ executor.scheduleAtFixedRate(cacheCleaner, expiryTimeMs, expiryTimeMs,
+ TimeUnit.MILLISECONDS);
+ }
+ map.put(new Key(datanodeID, block), new Value(fis));
+ inserted = true;
+ }
+ } finally {
+ if (!inserted) {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+ }
+
+ /**
+ * Find and remove an array of FileInputStream objects from the cache.
+ *
+ * @param datanodeID The DatanodeID to search for.
+ * @param block The Block to search for.
+ *
+ * @return null if no streams can be found; the
+ * array otherwise. If this is non-null, the
+ * array will have been removed from the cache.
+ */
+ public synchronized FileInputStream[] get(DatanodeID datanodeID,
+ ExtendedBlock block) {
+ Key key = new Key(datanodeID, block);
+ List ret = map.get(key);
+ if (ret.isEmpty()) return null;
+ Value val = ret.get(0);
+ map.remove(key, val);
+ return val.getFileInputStreams();
+ }
+
+ /**
+ * Close the cache and free all associated resources.
+ */
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ if (cacheCleaner != null) {
+ executor.remove(cacheCleaner);
+ }
+ for (Iterator> iter = map.entries().iterator();
+ iter.hasNext();
+ iter = map.entries().iterator()) {
+ Entry entry = iter.next();
+ entry.getValue().close();
+ iter.remove();
+ }
+ }
+
+ public synchronized String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("FileInputStreamCache(");
+ String prefix = "";
+ for (Entry entry : map.entries()) {
+ bld.append(prefix);
+ bld.append(entry.getKey());
+ prefix = ", ";
+ }
+ bld.append(")");
+ return bld.toString();
+ }
+
+ public long getExpiryTimeMs() {
+ return expiryTimeMs;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 2a9523351c1..99f1db99057 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -413,7 +413,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
}
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
startOffset = -1;
checksum = null;
if (peerCache != null & sentStatusCode) {
@@ -470,4 +471,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return DFSClient.TCP_WINDOW_SIZE;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 007249b7922..6961f382dfe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -275,7 +275,8 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
- public synchronized void close(PeerCache peerCache) throws IOException {
+ public synchronized void close(PeerCache peerCache,
+ FileInputStreamCache fisCache) throws IOException {
packetReceiver.close();
startOffset = -1;
checksum = null;
@@ -422,4 +423,11 @@ public class RemoteBlockReader2 implements BlockReader {
}
}
}
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return DFSClient.TCP_WINDOW_SIZE;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
index d22584fc787..bded892e8ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.net;
-import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
@@ -27,8 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.classification.InterfaceAudience;
-class DomainPeerServer implements PeerServer {
+@InterfaceAudience.Private
+public class DomainPeerServer implements PeerServer {
static Log LOG = LogFactory.getLog(DomainPeerServer.class);
private final DomainSocket sock;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 98094472a73..b584f3b1989 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -104,6 +104,18 @@ public interface DataTransferProtocol {
final String clientName,
final DatanodeInfo[] targets) throws IOException;
+ /**
+ * Request short circuit access file descriptors from a DataNode.
+ *
+ * @param blk The block to get file descriptors for.
+ * @param blockToken Security token for accessing the block.
+ * @param maxVersion Maximum version of the block data the client
+ * can understand.
+ */
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token blockToken,
+ int maxVersion) throws IOException;
+
/**
* Receive a block from a source datanode
* and then notifies the namenode
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
index 4b9b47fe3db..d64e83e0329 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -34,7 +34,8 @@ public enum Op {
REPLACE_BLOCK((byte)83),
COPY_BLOCK((byte)84),
BLOCK_CHECKSUM((byte)85),
- TRANSFER_BLOCK((byte)86);
+ TRANSFER_BLOCK((byte)86),
+ REQUEST_SHORT_CIRCUIT_FDS((byte)87);
/** The code for this operation. */
public final byte code;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index ff7a81babd7..260a0a6e212 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
/** Receiver */
@@ -77,6 +78,9 @@ public abstract class Receiver implements DataTransferProtocol {
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
+ case REQUEST_SHORT_CIRCUIT_FDS:
+ opRequestShortCircuitFds(in);
+ break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
@@ -117,6 +121,15 @@ public abstract class Receiver implements DataTransferProtocol {
fromProtos(proto.getTargetsList()));
}
+ /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
+ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
+ final OpRequestShortCircuitAccessProto proto =
+ OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
+ requestShortCircuitFds(fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()),
+ proto.getMaxVersion());
+ }
+
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 03e13080612..f117cdf9745 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -135,6 +136,17 @@ public class Sender implements DataTransferProtocol {
send(out, Op.TRANSFER_BLOCK, proto);
}
+ @Override
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token blockToken,
+ int maxVersion) throws IOException {
+ OpRequestShortCircuitAccessProto proto =
+ OpRequestShortCircuitAccessProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(
+ blk, blockToken)).setMaxVersion(maxVersion).build();
+ send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
+ }
+
@Override
public void replaceBlock(final ExtendedBlock blk,
final Token blockToken,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
index fe4eac1a6e5..4b273d3344e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
@@ -213,7 +213,7 @@ public class JspHelper {
offsetIntoBlock, amtToRead, true,
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().toString(),
- addr.getHostName(), poolId, addr.getPort(), 0, 0));
+ addr.getHostName(), poolId, addr.getPort(), 0, 0), null, false);
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;
@@ -232,8 +232,7 @@ public class JspHelper {
amtToRead -= numRead;
readOffset += numRead;
}
- blockReader = null;
- s.close();
+ blockReader.close(null, null);
out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 8045481538e..4d1889eb624 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -53,16 +53,15 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.AbstractList;
@@ -90,6 +89,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -149,11 +149,11 @@ import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -233,6 +233,7 @@ public class DataNode extends Configured
LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+ static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
/**
* Use {@link NetUtils#createSocketAddr(String)} instead.
@@ -250,6 +251,7 @@ public class DataNode extends Configured
public final static String EMPTY_DEL_HINT = "";
AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
+ Daemon localDataXceiverServer = null;
ThreadGroup threadGroup = null;
private DNConf dnConf;
private volatile boolean heartbeatsDisabledForTests = false;
@@ -261,6 +263,7 @@ public class DataNode extends Configured
private String hostName;
private DatanodeID id;
+ final private String fileDescriptorPassingDisabledReason;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
@@ -309,6 +312,24 @@ public class DataNode extends Configured
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+ // Determine whether we should try to pass file descriptors to clients.
+ if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+ String reason = DomainSocket.getLoadingFailureReason();
+ if (reason != null) {
+ LOG.warn("File descriptor passing is disabled because " + reason);
+ this.fileDescriptorPassingDisabledReason = reason;
+ } else {
+ LOG.info("File descriptor passing is enabled.");
+ this.fileDescriptorPassingDisabledReason = null;
+ }
+ } else {
+ this.fileDescriptorPassingDisabledReason =
+ "File descriptor passing was not configured.";
+ LOG.debug(this.fileDescriptorPassingDisabledReason);
+ }
+
try {
hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName);
@@ -537,6 +558,41 @@ public class DataNode extends Configured
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(tcpPeerServer, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
+
+ DomainPeerServer domainPeerServer =
+ getDomainPeerServer(conf, streamingAddr.getPort());
+ if (domainPeerServer != null) {
+ this.localDataXceiverServer = new Daemon(threadGroup,
+ new DataXceiverServer(domainPeerServer, conf, this));
+ LOG.info("Listening on UNIX domain socket: " +
+ domainPeerServer.getBindPath());
+ }
+ }
+
+ static DomainPeerServer getDomainPeerServer(Configuration conf,
+ int port) throws IOException {
+ String domainSocketPath =
+ conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ if (domainSocketPath == null) {
+ if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
+ DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
+ LOG.warn("Although short-circuit local reads are configured, " +
+ "they are disabled because you didn't configure " +
+ DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY);
+ }
+ return null;
+ }
+ if (DomainSocket.getLoadingFailureReason() != null) {
+ throw new RuntimeException("Although a UNIX domain socket " +
+ "path is configured as " + domainSocketPath + ", we cannot " +
+ "start a localDataXceiverServer because " +
+ DomainSocket.getLoadingFailureReason());
+ }
+ DomainPeerServer domainPeerServer =
+ new DomainPeerServer(domainSocketPath, port);
+ domainPeerServer.setReceiveBufferSize(
+ HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
+ return domainPeerServer;
}
// calls specific to BP
@@ -1039,6 +1095,42 @@ public class DataNode extends Configured
return info;
}
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ static public class ShortCircuitFdsUnsupportedException extends IOException {
+ private static final long serialVersionUID = 1L;
+ public ShortCircuitFdsUnsupportedException(String msg) {
+ super(msg);
+ }
+ }
+
+ @InterfaceAudience.LimitedPrivate("HDFS")
+ static public class ShortCircuitFdsVersionException extends IOException {
+ private static final long serialVersionUID = 1L;
+ public ShortCircuitFdsVersionException(String msg) {
+ super(msg);
+ }
+ }
+
+ FileInputStream[] requestShortCircuitFdsForRead(final ExtendedBlock blk,
+ final Token token, int maxVersion)
+ throws ShortCircuitFdsUnsupportedException,
+ ShortCircuitFdsVersionException, IOException {
+ if (fileDescriptorPassingDisabledReason != null) {
+ throw new ShortCircuitFdsUnsupportedException(
+ fileDescriptorPassingDisabledReason);
+ }
+ checkBlockToken(blk, token, BlockTokenSecretManager.AccessMode.READ);
+ int blkVersion = CURRENT_BLOCK_FORMAT_VERSION;
+ if (maxVersion < blkVersion) {
+ throw new ShortCircuitFdsVersionException("Your client is too old " +
+ "to read this block! Its format version is " +
+ blkVersion + ", but the highest format version you can read is " +
+ maxVersion);
+ }
+ metrics.incrBlocksGetLocalPathInfo();
+ return data.getShortCircuitFdsForRead(blk);
+ }
+
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks,
List> tokens) throws IOException,
@@ -1113,32 +1205,45 @@ public class DataNode extends Configured
if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
-
- // wait for all data receiver threads to exit
- if (this.threadGroup != null) {
- int sleepMs = 2;
- while (true) {
- this.threadGroup.interrupt();
- LOG.info("Waiting for threadgroup to exit, active threads is " +
- this.threadGroup.activeCount());
- if (this.threadGroup.activeCount() == 0) {
- break;
- }
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException e) {}
- sleepMs = sleepMs * 3 / 2; // exponential backoff
- if (sleepMs > 1000) {
- sleepMs = 1000;
- }
+ }
+ if (localDataXceiverServer != null) {
+ ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+ this.localDataXceiverServer.interrupt();
+ }
+ // wait for all data receiver threads to exit
+ if (this.threadGroup != null) {
+ int sleepMs = 2;
+ while (true) {
+ this.threadGroup.interrupt();
+ LOG.info("Waiting for threadgroup to exit, active threads is " +
+ this.threadGroup.activeCount());
+ if (this.threadGroup.activeCount() == 0) {
+ break;
+ }
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {}
+ sleepMs = sleepMs * 3 / 2; // exponential backoff
+ if (sleepMs > 1000) {
+ sleepMs = 1000;
}
}
- // wait for dataXceiveServer to terminate
+ this.threadGroup = null;
+ }
+ if (this.dataXceiverServer != null) {
+ // wait for dataXceiverServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
+ if (this.localDataXceiverServer != null) {
+ // wait for localDataXceiverServer to terminate
+ try {
+ this.localDataXceiverServer.join();
+ } catch (InterruptedException ie) {
+ }
+ }
if(blockPoolManager != null) {
try {
@@ -1523,6 +1628,9 @@ public class DataNode extends Configured
// start dataXceiveServer
dataXceiverServer.start();
+ if (localDataXceiverServer != null) {
+ localDataXceiverServer.start();
+ }
ipcServer.start();
startPlugins(conf);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index d618c787d13..2998cc61634 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.util.Time.now;
@@ -28,6 +29,8 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
@@ -60,11 +63,14 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -232,6 +238,68 @@ class DataXceiver extends Receiver implements Runnable {
}
}
+ @Override
+ public void requestShortCircuitFds(final ExtendedBlock blk,
+ final Token token,
+ int maxVersion) throws IOException {
+ updateCurrentThreadName("Passing file descriptors for block " + blk);
+ BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
+ FileInputStream fis[] = null;
+ try {
+ if (peer.getDomainSocket() == null) {
+ throw new IOException("You cannot pass file descriptors over " +
+ "anything but a UNIX domain socket.");
+ }
+ fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
+ bld.setStatus(SUCCESS);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ } catch (ShortCircuitFdsVersionException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
+ bld.setMessage(e.getMessage());
+ } catch (ShortCircuitFdsUnsupportedException e) {
+ bld.setStatus(ERROR_UNSUPPORTED);
+ bld.setMessage(e.getMessage());
+ } catch (InvalidToken e) {
+ bld.setStatus(ERROR_ACCESS_TOKEN);
+ bld.setMessage(e.getMessage());
+ } catch (IOException e) {
+ bld.setStatus(ERROR);
+ bld.setMessage(e.getMessage());
+ }
+ try {
+ bld.build().writeDelimitedTo(socketOut);
+ if (fis != null) {
+ FileDescriptor fds[] = new FileDescriptor[fis.length];
+ for (int i = 0; i < fds.length; i++) {
+ fds[i] = fis[i].getFD();
+ }
+ byte buf[] = new byte[] { (byte)0 };
+ peer.getDomainSocket().
+ sendFileDescriptors(fds, buf, 0, buf.length);
+ }
+ } finally {
+ if (ClientTraceLog.isInfoEnabled()) {
+ DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
+ .getBlockPoolId());
+ BlockSender.ClientTraceLog.info(String.format(
+ String.format(
+ "src: %s, dest: %s, op: %s, blockid: %s, srvID: %s, " +
+ "success: %b",
+ "127.0.0.1", // src IP
+ "127.0.0.1", // dst IP
+ "REQUEST_SHORT_CIRCUIT_FDS", // operation
+ blk.getBlockId(), // block id
+ dnR.getStorageID(),
+ (fis != null)
+ )));
+ }
+ if (fis != null) {
+ IOUtils.cleanup(LOG, fis);
+ }
+ }
+ }
+
@Override
public void readBlock(final ExtendedBlock block,
final Token blockToken,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 13ef752750b..57e8887be81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -386,4 +387,6 @@ public interface FsDatasetSpi extends FSDatasetMBean {
public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks)
throws IOException;
+ FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index c65e5faa405..427bfcf23a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1668,6 +1669,26 @@ class FsDatasetImpl implements FsDatasetSpi {
return info;
}
+ @Override // FsDatasetSpi
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ File datafile = getBlockFile(block);
+ File metafile = FsDatasetUtil.getMetaFile(datafile,
+ block.getGenerationStamp());
+ FileInputStream fis[] = new FileInputStream[2];
+ boolean success = false;
+ try {
+ fis[0] = new FileInputStream(datafile);
+ fis[1] = new FileInputStream(metafile);
+ success = true;
+ return fis;
+ } finally {
+ if (!success) {
+ IOUtils.cleanup(null, fis);
+ }
+ }
+ }
+
@Override // FsDatasetSpi
public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks)
throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 4dd1e808c54..e66611f8dab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -563,7 +563,7 @@ public class NamenodeFsck {
conf, file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()),
- chosenNode);
+ chosenNode, null, false);
} catch (IOException ex) {
// Put chosen node into dead list, continue
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
index ca24f7a4a44..7659f2ee8fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientDatanodeProtocol.proto
@@ -74,6 +74,8 @@ message DeleteBlockPoolResponseProto {
* Gets the file information where block and its metadata is stored
* block - block for which path information is being requested
* token - block token
+ *
+ * This message is deprecated in favor of file descriptor passing.
*/
message GetBlockLocalPathInfoRequestProto {
required ExtendedBlockProto block = 1;
@@ -84,6 +86,8 @@ message GetBlockLocalPathInfoRequestProto {
* block - block for which file path information is being returned
* localPath - file path where the block data is stored
* localMetaPath - file path where the block meta data is stored
+ *
+ * This message is deprecated in favor of file descriptor passing.
*/
message GetBlockLocalPathInfoResponseProto {
required ExtendedBlockProto block = 1;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index 8ce5fd75661..bba125c4cab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -114,6 +114,16 @@ message OpBlockChecksumProto {
required BaseHeaderProto header = 1;
}
+message OpRequestShortCircuitAccessProto {
+ required BaseHeaderProto header = 1;
+
+ /** In order to get short-circuit access to block data, clients must set this
+ * to the highest version of the block data that they can understand.
+ * Currently 1 is the only version, but more versions may exist in the future
+ * if the on-disk format changes.
+ */
+ required uint32 maxVersion = 2;
+}
message PacketHeaderProto {
// All fields must be fixed-length!
@@ -132,6 +142,7 @@ enum Status {
ERROR_EXISTS = 4;
ERROR_ACCESS_TOKEN = 5;
CHECKSUM_OK = 6;
+ ERROR_UNSUPPORTED = 7;
}
message PipelineAckProto {
@@ -164,6 +175,16 @@ message BlockOpResponseProto {
/** explanatory text which may be useful to log on the client side */
optional string message = 5;
+
+ /** If the server chooses to agree to the request of a client for
+ * short-circuit access, it will send a response message with the relevant
+ * file descriptors attached.
+ *
+ * In the body of the message, this version number will be set to the
+ * specific version number of the block data that the client is about to
+ * read.
+ */
+ optional uint32 shortCircuitAccessVersion = 6;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 1731b2b5f5d..51b0796572d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.unix.DomainSocket;
/**
* A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
@@ -156,7 +157,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(),
offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
- nodes[0]);
+ nodes[0], null, false);
}
/**
@@ -168,4 +169,12 @@ public class BlockReaderTestUtil {
return cluster.getDataNode(ipcport);
}
+ public boolean haveRequiredResources() {
+ if (conf.get(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY) != null) {
+ // To use UNIX Domain sockets, we must have the native code loaded.
+ return DomainSocket.getLoadingFailureReason() == null;
+ } else {
+ return true;
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 95008348bda..39f417128a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2189,14 +2189,27 @@ public class MiniDFSCluster {
/**
* Get file correpsonding to a block
* @param storageDir storage directory
- * @param blk block to be corrupted
- * @return file corresponding to the block
+ * @param blk the block
+ * @return data file corresponding to the block
*/
public static File getBlockFile(File storageDir, ExtendedBlock blk) {
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
blk.getBlockName());
}
+ /**
+ * Get the latest metadata file correpsonding to a block
+ * @param storageDir storage directory
+ * @param blk the block
+ * @return metadata file corresponding to the block
+ */
+ public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
+ return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
+ blk.getBlockName() + "_" + blk.getGenerationStamp() +
+ Block.METADATA_EXTENSION);
+
+ }
+
/**
* Shut down a cluster if it is not null
* @param cluster cluster reference or null
@@ -2224,7 +2237,7 @@ public class MiniDFSCluster {
}
/**
- * Get files related to a block for a given datanode
+ * Get the block data file for a block from a given datanode
* @param dnIndex Index of the datanode to get block files for
* @param block block for which corresponding files are needed
*/
@@ -2239,6 +2252,24 @@ public class MiniDFSCluster {
}
return null;
}
+
+ /**
+ * Get the block metadata file for a block from a given datanode
+ *
+ * @param dnIndex Index of the datanode to get block files for
+ * @param block block for which corresponding files are needed
+ */
+ public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
+ // Check for block file in the two storage directories of the datanode
+ for (int i = 0; i <=1 ; i++) {
+ File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
+ File blockMetaFile = getBlockMetadataFile(storageDir, block);
+ if (blockMetaFile.exists()) {
+ return blockMetaFile;
+ }
+ }
+ return null;
+ }
/**
* Throw an exception if the MiniDFSCluster is not started with a single
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
index 7ccd5b6f52f..e35da42a7d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
@@ -17,90 +17,333 @@
*/
package org.apache.hadoop.hdfs;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
import org.junit.Test;
public class TestBlockReaderLocal {
- static MiniDFSCluster cluster;
- static HdfsConfiguration conf;
-
- @BeforeClass
- public static void setupCluster() throws IOException {
- conf = new HdfsConfiguration();
-
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
-
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
- }
-
- @AfterClass
- public static void teardownCluster() {
- cluster.shutdown();
+ public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
+ int off2, int len) {
+ for (int i = 0; i < len; i++) {
+ if (buf1[off1 + i] != buf2[off2 + i]) {
+ Assert.fail("arrays differ at byte " + i + ". " +
+ "The first array has " + (int)buf1[off1 + i] +
+ ", but the second array has " + (int)buf2[off2 + i]);
+ }
+ }
}
/**
- * Test that, in the case of an error, the position and limit of a ByteBuffer
- * are left unchanged. This is not mandated by ByteBufferReadable, but clients
- * of this class might immediately issue a retry on failure, so it's polite.
+ * Similar to IOUtils#readFully(). Reads bytes in a loop.
+ *
+ * @param reader The BlockReaderLocal to read bytes from
+ * @param buf The ByteBuffer to read into
+ * @param off The offset in the buffer to read into
+ * @param len The number of bytes to read.
+ *
+ * @throws IOException If it could not read the requested number of bytes
*/
+ private static void readFully(BlockReaderLocal reader,
+ ByteBuffer buf, int off, int len) throws IOException {
+ int amt = len;
+ while (amt > 0) {
+ buf.limit(off + len);
+ buf.position(off);
+ long ret = reader.read(buf);
+ if (ret < 0) {
+ throw new EOFException( "Premature EOF from BlockReaderLocal " +
+ "after reading " + (len - amt) + " byte(s).");
+ }
+ amt -= ret;
+ off += ret;
+ }
+ }
+
+ private static interface BlockReaderLocalTest {
+ final int TEST_LENGTH = 12345;
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException;
+ }
+
+ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
+ boolean checksum) throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ FileInputStream dataIn = null, checkIn = null;
+ final Path TEST_PATH = new Path("/a");
+ final long RANDOM_SEED = 4567L;
+ BlockReaderLocal blockReaderLocal = null;
+ FSDataInputStream fsIn = null;
+ byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ fsIn = fs.open(TEST_PATH);
+ IOUtils.readFully(fsIn, original, 0,
+ BlockReaderLocalTest.TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
+
+ DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId();
+ cluster.shutdown();
+ cluster = null;
+ test.setup(dataFile, checksum);
+ dataIn = new FileInputStream(dataFile);
+ checkIn = new FileInputStream(metaFile);
+ blockReaderLocal = new BlockReaderLocal(conf,
+ TEST_PATH.getName(), block, 0, -1,
+ dataIn, checkIn, datanodeID, checksum);
+ dataIn = null;
+ checkIn = null;
+ test.doTest(blockReaderLocal, original);
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ if (dataIn != null) dataIn.close();
+ if (checkIn != null) checkIn.close();
+ if (blockReaderLocal != null) blockReaderLocal.close(null, null);
+ }
+ }
+
+ private static class TestBlockReaderLocalImmediateClose
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException { }
+ }
+
@Test
- public void testStablePositionAfterCorruptRead() throws Exception {
- final short REPL_FACTOR = 1;
- final long FILE_LENGTH = 512L;
- cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+ public void testBlockReaderLocalImmediateClose() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
+ }
+
+ private static class TestBlockReaderSimpleReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 512);
+ assertArrayRegionsEqual(original, 0, buf, 0, 512);
+ reader.readFully(buf, 512, 512);
+ assertArrayRegionsEqual(original, 512, buf, 512, 512);
+ reader.readFully(buf, 1024, 513);
+ assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
+ reader.readFully(buf, 1537, 514);
+ assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
+ }
+ }
+
+ @Test
+ public void testBlockReaderSimpleReads() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
+ }
- Path path = new Path("/corrupted");
+ @Test
+ public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
+ }
+
+ private static class TestBlockReaderLocalArrayReads2
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.readFully(buf, 810, 1); // from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900); // skip from offset 816 to offset 1716
+ reader.readFully(buf, 1716, 5);
+ assertArrayRegionsEqual(original, 1716, buf, 1716, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalArrayReads2() throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+ true);
+ }
- DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
- DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
+ @Test
+ public void testBlockReaderLocalArrayReads2NoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
+ false);
+ }
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
- int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
- assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
+ private static class TestBlockReaderLocalByteBufferReads
+ implements BlockReaderLocalTest {
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException { }
+ @Override
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
+ readFully(reader, buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf.array(), 0, 10);
+ readFully(reader, buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
+ readFully(reader, buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ readFully(reader, buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalByteBufferReads()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), true);
+ }
- FSDataInputStream dis = cluster.getFileSystem().open(path);
- ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
- boolean sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ @Test
+ public void testBlockReaderLocalByteBufferReadsNoChecksum()
+ throws IOException {
+ runBlockReaderLocalTest(
+ new TestBlockReaderLocalByteBufferReads(), false);
+ }
+
+ private static class TestBlockReaderLocalReadCorruptStart
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
}
- assertTrue(sawException);
- assertEquals(0, buf.position());
- assertEquals(buf.capacity(), buf.limit());
-
- dis = cluster.getFileSystem().open(path);
- buf.position(3);
- buf.limit(25);
- sawException = false;
- try {
- dis.read(buf);
- } catch (ChecksumException ex) {
- sawException = true;
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ if (usingChecksums) {
+ try {
+ reader.readFully(buf, 0, 10);
+ Assert.fail("did not detect corruption");
+ } catch (IOException e) {
+ // expected
+ }
+ } else {
+ reader.readFully(buf, 0, 10);
+ }
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorruptStart()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
+ }
+
+ private static class TestBlockReaderLocalReadCorrupt
+ implements BlockReaderLocalTest {
+ boolean usingChecksums = false;
+ @Override
+ public void setup(File blockFile, boolean usingChecksums)
+ throws IOException {
+ RandomAccessFile bf = null;
+ this.usingChecksums = usingChecksums;
+ try {
+ bf = new RandomAccessFile(blockFile, "rw");
+ bf.seek(1539);
+ bf.write(new byte[] {0,0,0,0,0,0,0,0,0,0,0,0,0,0});
+ } finally {
+ if (bf != null) bf.close();
+ }
}
- assertTrue(sawException);
- assertEquals(3, buf.position());
- assertEquals(25, buf.limit());
+ public void doTest(BlockReaderLocal reader, byte original[])
+ throws IOException {
+ byte buf[] = new byte[TEST_LENGTH];
+ try {
+ reader.readFully(buf, 0, 10);
+ assertArrayRegionsEqual(original, 0, buf, 0, 10);
+ reader.readFully(buf, 10, 100);
+ assertArrayRegionsEqual(original, 10, buf, 10, 100);
+ reader.readFully(buf, 110, 700);
+ assertArrayRegionsEqual(original, 110, buf, 110, 700);
+ reader.skip(1); // skip from offset 810 to offset 811
+ reader.readFully(buf, 811, 5);
+ assertArrayRegionsEqual(original, 811, buf, 811, 5);
+ reader.readFully(buf, 816, 900);
+ if (usingChecksums) {
+ // We should detect the corruption when using a checksum file.
+ Assert.fail("did not detect corruption");
+ }
+ } catch (ChecksumException e) {
+ if (!usingChecksums) {
+ Assert.fail("didn't expect to get ChecksumException: not " +
+ "using checksums.");
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testBlockReaderLocalReadCorrupt()
+ throws IOException {
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
+ runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
index 2a0e0a85565..4a88e0b1d30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
@@ -61,7 +61,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -76,7 +76,7 @@ public class TestClientBlockVerification {
// We asked the blockreader for the whole file, and only read
// half of it, so no CHECKSUM_OK
verify(reader, never()).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -92,7 +92,7 @@ public class TestClientBlockVerification {
// And read half the file
util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
/**
@@ -111,7 +111,7 @@ public class TestClientBlockVerification {
util.getBlockReader(testBlock, startOffset, length));
util.readAndCheckEOS(reader, length, true);
verify(reader).sendReadResult(Status.CHECKSUM_OK);
- reader.close(null);
+ reader.close(null, null);
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
new file mode 100644
index 00000000000..7f28a43ceb4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileInputStreamCache.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.Test;
+
+public class TestFileInputStreamCache {
+ static final Log LOG = LogFactory.getLog(TestFileInputStreamCache.class);
+
+ @Test
+ public void testCreateAndDestroy() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(10, 1000);
+ cache.close();
+ }
+
+ private static class TestFileDescriptorPair {
+ TemporarySocketDirectory dir = new TemporarySocketDirectory();
+ FileInputStream fis[];
+
+ public TestFileDescriptorPair() throws IOException {
+ fis = new FileInputStream[2];
+ for (int i = 0; i < 2; i++) {
+ String name = dir.getDir() + "/file" + i;
+ FileOutputStream fos = new FileOutputStream(name);
+ fos.write(1);
+ fos.close();
+ fis[i] = new FileInputStream(name);
+ }
+ }
+
+ public FileInputStream[] getFileInputStreams() {
+ return fis;
+ }
+
+ public void close() throws IOException {
+ IOUtils.cleanup(LOG, fis);
+ dir.close();
+ }
+
+ public boolean compareWith(FileInputStream other[]) {
+ if ((other == null) || (fis == null)) {
+ return other == fis;
+ }
+ if (fis.length != other.length) return false;
+ for (int i = 0; i < fis.length; i++) {
+ if (fis[i] != other[i]) return false;
+ }
+ return true;
+ }
+ }
+
+ @Test
+ public void testAddAndRetrieve() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 1000000);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertTrue(pair.compareWith(fis));
+ pair.close();
+ cache.close();
+ }
+
+ @Test
+ public void testExpiry() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 10);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ Thread.sleep(cache.getExpiryTimeMs() * 100);
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertNull(fis);
+ pair.close();
+ cache.close();
+ }
+
+ @Test
+ public void testEviction() throws Exception {
+ FileInputStreamCache cache = new FileInputStreamCache(1, 10000000);
+ DatanodeID dnId = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8080, 9090, 7070);
+ ExtendedBlock block = new ExtendedBlock("poolid", 123);
+ TestFileDescriptorPair pair = new TestFileDescriptorPair();
+ cache.put(dnId, block, pair.getFileInputStreams());
+ DatanodeID dnId2 = new DatanodeID("127.0.0.1", "localhost",
+ "xyzzy", 8081, 9091, 7071);
+ TestFileDescriptorPair pair2 = new TestFileDescriptorPair();
+ cache.put(dnId2, block, pair2.getFileInputStreams());
+ FileInputStream fis[] = cache.get(dnId, block);
+ Assert.assertNull(fis);
+ FileInputStream fis2[] = cache.get(dnId2, block);
+ Assert.assertTrue(pair2.compareWith(fis2));
+ pair.close();
+ cache.close();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
index b4320520354..3ec1a069b0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java
@@ -17,14 +17,10 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
public class TestParallelRead extends TestParallelReadUtil {
-
@BeforeClass
static public void setupCluster() throws Exception {
setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
@@ -34,26 +30,4 @@ public class TestParallelRead extends TestParallelReadUtil {
static public void teardownCluster() throws Exception {
TestParallelReadUtil.teardownCluster();
}
-
- /**
- * Do parallel read several times with different number of files and threads.
- *
- * Note that while this is the only "test" in a junit sense, we're actually
- * dispatching a lot more. Failures in the other methods (and other threads)
- * need to be manually collected, which is inconvenient.
- */
- @Test
- public void testParallelReadCopying() throws IOException {
- runTestWorkload(new CopyingReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadByteBuffer() throws IOException {
- runTestWorkload(new DirectReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadMixed() throws IOException {
- runTestWorkload(new MixedWorkloadHelper());
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
index 1c59eca871d..6f10804d5db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java
@@ -32,12 +32,18 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.junit.Assume;
+import org.junit.Ignore;
+import org.junit.Test;
/**
* Driver class for testing the use of DFSInputStream by multiple concurrent
- * readers, using the different read APIs. See subclasses for the actual test
- * cases.
+ * readers, using the different read APIs.
+ *
+ * This class is marked as @Ignore so that junit doesn't try to execute the
+ * tests in here directly. They are executed from subclasses.
*/
+@Ignore
public class TestParallelReadUtil {
static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
@@ -386,4 +392,28 @@ public class TestParallelReadUtil {
util.shutdown();
}
+ /**
+ * Do parallel read several times with different number of files and threads.
+ *
+ * Note that while this is the only "test" in a junit sense, we're actually
+ * dispatching a lot more. Failures in the other methods (and other threads)
+ * need to be manually collected, which is inconvenient.
+ */
+ @Test
+ public void testParallelReadCopying() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new CopyingReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadByteBuffer() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new DirectReadWorkerHelper());
+ }
+
+ @Test
+ public void testParallelReadMixed() throws IOException {
+ Assume.assumeTrue(util.haveRequiredResources());
+ runTestWorkload(new MixedWorkloadHelper());
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
new file mode 100644
index 00000000000..776e5d7e030
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitRead.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelShortCircuitRead extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ DomainSocket.disableBindPathValidation();
+ setupCluster(1, conf);
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
similarity index 53%
rename from hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
index 05702d2aa4b..2f2a2c6b96b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelLocalRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelShortCircuitReadNoChecksum.java
@@ -17,52 +17,32 @@
*/
package org.apache.hadoop.hdfs;
-import java.io.IOException;
+import java.io.File;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
-public class TestParallelLocalRead extends TestParallelReadUtil {
+public class TestParallelShortCircuitReadNoChecksum extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
@BeforeClass
static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
HdfsConfiguration conf = new HdfsConfiguration();
-
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
- conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
- false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
-
+ conf.setBoolean(DFSConfigKeys.
+ DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
+ DomainSocket.disableBindPathValidation();
setupCluster(1, conf);
}
@AfterClass
static public void teardownCluster() throws Exception {
+ sockDir.close();
TestParallelReadUtil.teardownCluster();
}
-
- /**
- * Do parallel read several times with different number of files and threads.
- *
- * Note that while this is the only "test" in a junit sense, we're actually
- * dispatching a lot more. Failures in the other methods (and other threads)
- * need to be manually collected, which is inconvenient.
- */
- @Test
- public void testParallelReadCopying() throws IOException {
- runTestWorkload(new CopyingReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadByteBuffer() throws IOException {
- runTestWorkload(new DirectReadWorkerHelper());
- }
-
- @Test
- public void testParallelReadMixed() throws IOException {
- runTestWorkload(new MixedWorkloadHelper());
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
new file mode 100644
index 00000000000..d2170d32bfe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelUnixDomainRead.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestParallelUnixDomainRead extends TestParallelReadUtil {
+ private static TemporarySocketDirectory sockDir;
+
+ @BeforeClass
+ static public void setupCluster() throws Exception {
+ sockDir = new TemporarySocketDirectory();
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(), "TestParallelLocalRead.%d.sock").getAbsolutePath());
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false);
+ DomainSocket.disableBindPathValidation();
+ setupCluster(1, conf);
+ }
+
+ @AfterClass
+ static public void teardownCluster() throws Exception {
+ sockDir.close();
+ TestParallelReadUtil.teardownCluster();
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
index 623c1f656c2..3f21e230e90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,21 +32,22 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
/**
@@ -55,8 +58,18 @@ import org.junit.Test;
* system.
*/
public class TestShortCircuitLocalRead {
+ private static TemporarySocketDirectory sockDir;
- static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
+ @BeforeClass
+ public static void init() {
+ sockDir = new TemporarySocketDirectory();
+ DomainSocket.disableBindPathValidation();
+ }
+
+ @AfterClass
+ public static void shutdown() throws IOException {
+ sockDir.close();
+ }
static final long seed = 0xDEADBEEFL;
static final int blockSize = 5120;
@@ -81,7 +94,9 @@ public class TestShortCircuitLocalRead {
for (int idx = 0; idx < len; idx++) {
if (expected[from + idx] != actual[idx]) {
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
- + expected[from + idx] + " actual " + actual[idx]);
+ + expected[from + idx] + " actual " + actual[idx] +
+ "\nexpected: " + StringUtils.byteToHexString(expected, from, from + len) +
+ "\nactual: " + StringUtils.byteToHexString(actual, 0, len));
}
}
}
@@ -170,8 +185,9 @@ public class TestShortCircuitLocalRead {
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
+ conf.set(DFSConfigKeys.DFS_DATANODE_DOMAIN_SOCKET_PATH_KEY,
+ new File(sockDir.getDir(),
+ "TestShortCircuitLocalRead.__PORT__.sock").getAbsolutePath());
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -229,23 +245,17 @@ public class TestShortCircuitLocalRead {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
-
+
@Test
- public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
+ public void testDeprecatedGetBlockLocalPathInfoRpc()
+ throws IOException, InterruptedException {
final Configuration conf = new Configuration();
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- "alloweduser1,alloweduser2");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
cluster.waitActive();
- final DataNode dn = cluster.getDataNodes().get(0);
FileSystem fs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
- UserGroupInformation aUgi1 =
- UserGroupInformation.createRemoteUser("alloweduser1");
- UserGroupInformation aUgi2 =
- UserGroupInformation.createRemoteUser("alloweduser2");
LocatedBlocks lb = cluster.getNameNode().getRpcServer()
.getBlockLocations("/tmp/x", 0, 16);
// Create a new block object, because the block inside LocatedBlock at
@@ -253,51 +263,11 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
- ClientDatanodeProtocol proxy = aUgi1
- .doAs(new PrivilegedExceptionAction() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
-
- // This should succeed
- BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Try with the other allowed user
- proxy = aUgi2
- .doAs(new PrivilegedExceptionAction() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
-
- // This should succeed as well
- blpi = proxy.getBlockLocalPathInfo(blk, token);
- Assert.assertEquals(
- DataNodeTestUtils.getFSDataset(dn).getBlockLocalPathInfo(blk).getBlockPath(),
- blpi.getBlockPath());
-
- // Now try with a disallowed user
- UserGroupInformation bUgi = UserGroupInformation
- .createRemoteUser("notalloweduser");
- proxy = bUgi
- .doAs(new PrivilegedExceptionAction() {
- @Override
- public ClientDatanodeProtocol run() throws Exception {
- return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
- 60000, false);
- }
- });
+ ClientDatanodeProtocol proxy =
+ DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
try {
proxy.getBlockLocalPathInfo(blk, token);
- Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ Assert.fail("The call should have failed as this user "
+ " is not allowed to call getBlockLocalPathInfo");
} catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains(
@@ -315,8 +285,6 @@ public class TestShortCircuitLocalRead {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@@ -354,6 +322,86 @@ public class TestShortCircuitLocalRead {
cluster.shutdown();
}
}
+
+ @Test
+ public void testHandleTruncatedBlockFile() throws IOException {
+ MiniDFSCluster cluster = null;
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
+ conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
+ final Path TEST_PATH = new Path("/a");
+ final Path TEST_PATH2 = new Path("/b");
+ final long RANDOM_SEED = 4567L;
+ final long RANDOM_SEED2 = 4568L;
+ FSDataInputStream fsIn = null;
+ final int TEST_LENGTH = 3456;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, TEST_PATH,
+ TEST_LENGTH, (short)1, RANDOM_SEED);
+ DFSTestUtil.createFile(fs, TEST_PATH2,
+ TEST_LENGTH, (short)1, RANDOM_SEED2);
+ fsIn = cluster.getFileSystem().open(TEST_PATH2);
+ byte original[] = new byte[TEST_LENGTH];
+ IOUtils.readFully(fsIn, original, 0, TEST_LENGTH);
+ fsIn.close();
+ fsIn = null;
+ try {
+ DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
+ } catch (InterruptedException e) {
+ Assert.fail("unexpected InterruptedException during " +
+ "waitReplication: " + e);
+ } catch (TimeoutException e) {
+ Assert.fail("unexpected TimeoutException during " +
+ "waitReplication: " + e);
+ }
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
+ File dataFile = MiniDFSCluster.getBlockFile(0, block);
+ cluster.shutdown();
+ cluster = null;
+ RandomAccessFile raf = null;
+ try {
+ raf = new RandomAccessFile(dataFile, "rw");
+ raf.setLength(0);
+ } finally {
+ if (raf != null) raf.close();
+ }
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(false).build();
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ fsIn = fs.open(TEST_PATH);
+ try {
+ byte buf[] = new byte[100];
+ fsIn.seek(2000);
+ fsIn.readFully(buf, 0, buf.length);
+ Assert.fail("shouldn't be able to read from corrupt 0-length " +
+ "block file.");
+ } catch (IOException e) {
+ DFSClient.LOG.error("caught exception ", e);
+ }
+ fsIn.close();
+ fsIn = null;
+
+ // We should still be able to read the other file.
+ // This is important because it indicates that we detected that the
+ // previous block was corrupt, rather than blaming the problem on
+ // communication.
+ fsIn = fs.open(TEST_PATH2);
+ byte buf[] = new byte[original.length];
+ fsIn.readFully(buf, 0, buf.length);
+ TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0,
+ original.length);
+ fsIn.close();
+ fsIn = null;
+ } finally {
+ if (fsIn != null) fsIn.close();
+ if (cluster != null) cluster.shutdown();
+ }
+ }
/**
* Test to run benchmarks between shortcircuit read vs regular read with
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
index fc2c6c67912..d68625de36a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
@@ -148,7 +148,7 @@ public class TestBlockTokenWithDFS {
blockReader = BlockReaderFactory.newBlockReader(
conf, file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
- nodes[0]);
+ nodes[0], null, false);
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 26926be0f36..ef73d868151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -962,6 +963,12 @@ public class SimulatedFSDataset implements FsDatasetSpi {
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public FileInputStream[] getShortCircuitFdsForRead(ExtendedBlock block)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
@Override
public HdfsBlocksMetadata getHdfsBlocksMetadata(List blocks)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index efe0a4cc962..3ba91c4dc1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -32,6 +32,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -281,11 +282,11 @@ public class TestDataNodeVolumeFailure {
String file = BlockReaderFactory.getFileName(targetAddr,
"test-blockpoolid",
block.getBlockId());
- BlockReaderFactory.newBlockReader(conf, file, block,
+ BlockReader blockReader =
+ BlockReaderFactory.newBlockReader(conf, file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
- TcpPeerServer.peerFromSocket(s), datanode);
-
- // nothing - if it fails - it will throw and exception
+ TcpPeerServer.peerFromSocket(s), datanode, null, false);
+ blockReader.close(null, null);
}
/**