Merged r1204792 from trunk for HDFS-2246.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1205694 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-11-24 01:42:01 +00:00
parent 11b908bf27
commit 69931bef78
17 changed files with 1084 additions and 29 deletions

View File

@ -46,6 +46,10 @@ Release 0.23.1 - UNRELEASED
HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker. HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker.
(todd) (todd)
HDFS-2246. Enable reading a block directly from local file system
for a client on the same node as the block file. (Andrew Purtell,
Suresh Srinivas and Jitendra Nath Pandey via szetszwo)
BUG FIXES BUG FIXES
HDFS-2541. For a sufficiently large value of blocks, the DN Scanner HDFS-2541. For a sufficiently large value of blocks, the DN Scanner

View File

@ -0,0 +1,380 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly
* from the local file system rather than going through the datanode for better
* performance. <br>
* {@link BlockReaderLocal} works as follows:
* <ul>
* <li>The client performing short circuit reads must be configured at the
* datanode.</li>
* <li>The client gets the path to the file where block is stored using
* {@link org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol#getBlockLocalPathInfo(ExtendedBlock, Token)}
* RPC call</li>
* <li>Client uses kerberos authentication to connect to the datanode over RPC,
* if security is enabled.</li>
* </ul>
*/
class BlockReaderLocal extends RemoteBlockReader2 {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
private ClientDatanodeProtocol proxy = null;
private final Map<ExtendedBlock, BlockLocalPathInfo> cache;
LocalDatanodeInfo() {
final int cacheSize = 10000;
final float hashTableLoadFactor = 0.75f;
int hashTableCapacity = (int) Math.ceil(cacheSize / hashTableLoadFactor) + 1;
cache = Collections
.synchronizedMap(new LinkedHashMap<ExtendedBlock, BlockLocalPathInfo>(
hashTableCapacity, hashTableLoadFactor, true) {
private static final long serialVersionUID = 1;
@Override
protected boolean removeEldestEntry(
Map.Entry<ExtendedBlock, BlockLocalPathInfo> eldest) {
return size() > cacheSize;
}
});
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout)
throws IOException {
if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout);
}
return proxy;
}
private synchronized void resetDatanodeProxy() {
if (null != proxy) {
RPC.stopProxy(proxy);
proxy = null;
}
}
private BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) {
return cache.get(b);
}
private void setBlockLocalPathInfo(ExtendedBlock b, BlockLocalPathInfo info) {
cache.put(b, info);
}
private void removeBlockLocalPathInfo(ExtendedBlock b) {
cache.remove(b);
}
}
// Multiple datanodes could be running on the local machine. Store proxies in
// a map keyed by the ipc port of the datanode.
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary;
ByteBuffer dataBuff = null;
ByteBuffer checksumBuff = null;
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token);
}
// check to see if the file exists. It may so happen that the
// HDFS file has been deleted and this block-lookup is occurring
// on behalf of a new HDFS file. This time, the block file could
// be residing in a different portion of the fs.data.dir directory.
// In this case, we remove this entry from the cache. The next
// call to this method will re-populate the cache.
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocal localBlockReader = null;
boolean skipChecksumCheck = skipChecksumCheck(conf);
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
dataIn = new FileInputStream(blkfile);
if (LOG.isDebugEnabled()) {
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
+ blkfile.length() + " startOffset " + startOffset + " length "
+ length + " short circuit checksum " + skipChecksumCheck);
}
if (!skipChecksumCheck) {
// get the metadata file
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != FSDataset.METADATA_VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocal(conf, file, blk, token,
startOffset, length, pathinfo, checksum, true, dataIn,
firstChunkOffset, checksumIn);
} else {
localBlockReader = new BlockReaderLocal(conf, file, blk, token,
startOffset, length, pathinfo, dataIn);
}
} catch (IOException e) {
// remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocal: Removing " + blk
+ " from cache because local file " + pathinfo.getBlockPath()
+ " could not be opened.");
throw e;
} finally {
if (localBlockReader == null) {
if (dataIn != null) {
dataIn.close();
}
if (checksumIn != null) {
checksumIn.close();
}
}
}
return localBlockReader;
}
private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
LocalDatanodeInfo ldInfo = localDatanodeInfoMap.get(port);
if (ldInfo == null) {
ldInfo = new LocalDatanodeInfo();
localDatanodeInfoMap.put(port, ldInfo);
}
return ldInfo;
}
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.ipcPort);
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
conf, timeout);
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
if (pathinfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}
localDatanodeInfo.setBlockLocalPathInfo(blk, pathinfo);
}
} catch (IOException e) {
localDatanodeInfo.resetDatanodeProxy(); // Reset proxy on error
throw e;
}
return pathinfo;
}
private static boolean skipChecksumCheck(Configuration conf) {
return conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
}
private BlockReaderLocal(Configuration conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
throws IOException {
this(conf, hdfsfile, block, token, startOffset, length, pathinfo,
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL, 4), false,
dataIn, startOffset, null);
}
private BlockReaderLocal(Configuration conf, String hdfsfile,
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException {
super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn
.getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset,
length, null);
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
checksumBuff = bufferPool.getBuffer(checksumSize*64);
//Initially the buffers have nothing to read.
dataBuff.flip();
checksumBuff.flip();
long toSkip = firstChunkOffset;
while (toSkip > 0) {
long skipped = dataIn.skip(toSkip);
if (skipped == 0) {
throw new IOException("Couldn't initialize input stream");
}
toSkip -= skipped;
}
if (checksumIn != null) {
long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
* checksumSize;
while (checkSumOffset > 0) {
long skipped = checksumIn.skip(checkSumOffset);
if (skipped == 0) {
throw new IOException("Couldn't initialize checksum input stream");
}
checkSumOffset -= skipped;
}
}
}
private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
throws IOException {
int bytesRead = stream.getChannel().read(buf);
if (bytesRead < 0) {
//EOF
return bytesRead;
}
while (buf.remaining() > 0) {
int n = stream.getChannel().read(buf);
if (n < 0) {
//EOF
return bytesRead;
}
bytesRead += n;
}
return bytesRead;
}
@Override
public synchronized int read(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.info("read off " + off + " len " + len);
}
if (!verifyChecksum) {
return dataIn.read(buf, off, len);
} else {
int dataRead = -1;
if (dataBuff.remaining() == 0) {
dataBuff.clear();
checksumBuff.clear();
dataRead = readIntoBuffer(dataIn, dataBuff);
readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip();
dataBuff.flip();
if (verifyChecksum) {
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
this.startOffset);
}
} else {
dataRead = dataBuff.remaining();
}
if (dataRead > 0) {
int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
if (offsetFromChunkBoundary > 0) {
dataBuff.position(offsetFromChunkBoundary);
// Its either end of file or dataRead is greater than the
// offsetFromChunkBoundary
offsetFromChunkBoundary = 0;
}
if (nRead > 0) {
dataBuff.get(buf, off, nRead);
return nRead;
} else {
return 0;
}
} else {
return -1;
}
}
}
@Override
public synchronized long skip(long n) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("skip " + n);
}
if (!verifyChecksum) {
return dataIn.skip(n);
} else {
return super.skip(n);
}
}
@Override
public synchronized void close() throws IOException {
dataIn.close();
if (checksumIn != null) {
checksumIn.close();
}
if (dataBuff != null) {
bufferPool.returnBuffer(dataBuff);
dataBuff = null;
}
if (checksumBuff != null) {
bufferPool.returnBuffer(checksumBuff);
checksumBuff = null;
}
super.close();
}
}

View File

@ -24,12 +24,18 @@ import java.io.DataOutputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import javax.net.SocketFactory; import javax.net.SocketFactory;
@ -77,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@ -99,6 +106,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
/******************************************************** /********************************************************
* DFSClient can connect to a Hadoop Filesystem and * DFSClient can connect to a Hadoop Filesystem and
@ -230,7 +238,8 @@ public class DFSClient implements java.io.Closeable {
*/ */
private final Map<String, DFSOutputStream> filesBeingWritten private final Map<String, DFSOutputStream> filesBeingWritten
= new HashMap<String, DFSOutputStream>(); = new HashMap<String, DFSOutputStream>();
private boolean shortCircuitLocalReads;
/** /**
* Same as this(NameNode.getAddress(conf), conf); * Same as this(NameNode.getAddress(conf), conf);
* @see #DFSClient(InetSocketAddress, Configuration) * @see #DFSClient(InetSocketAddress, Configuration)
@ -294,6 +303,13 @@ public class DFSClient implements java.io.Closeable {
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: " "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
+ "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
} }
// read directly from the block file if configured.
this.shortCircuitLocalReads = conf.getBoolean(
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug("Short circuit read is " + shortCircuitLocalReads);
}
} }
/** /**
@ -481,6 +497,82 @@ public class DFSClient implements java.io.Closeable {
} }
} }
/**
* Get {@link BlockReader} for short circuited local reads.
*/
static BlockReader getLocalBlockReader(Configuration conf,
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock)
throws InvalidToken, IOException {
try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- offsetIntoBlock);
} catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
}
}
private static Set<String> localIpAddresses = Collections
.synchronizedSet(new HashSet<String>());
private static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
if (localIpAddresses.contains(addr.getHostAddress())) {
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local");
}
return true;
}
// Check if the address is any local or loop back
boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
// Check if the address is defined on any interface
if (!local) {
try {
local = NetworkInterface.getByInetAddress(addr) != null;
} catch (SocketException e) {
local = false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local");
}
if (local == true) {
localIpAddresses.add(addr.getHostAddress());
}
return local;
}
/**
* Should the block access token be refetched on an exception
*
* @param ex Exception received
* @param targetAddr Target datanode address from where exception was received
* @return true if block access token has expired or invalid and it should be
* refetched
*/
private static boolean tokenRefetchNeeded(IOException ex,
InetSocketAddress targetAddr) {
/*
* Get a new access token and retry. Retry is needed in 2 cases. 1) When
* both NN and DN re-started while DFSClient holding a cached access token.
* 2) In the case that NN fails to update its access key at pre-set interval
* (by a wide margin) and subsequently restarts. In this case, DN
* re-registers itself with NN and receives a new access key, but DN will
* delete the old access key from its memory since it's considered expired
* based on the estimated expiration date.
*/
if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
LOG.info("Access token was invalid when connecting to " + targetAddr
+ " : " + ex);
return true;
}
return false;
}
/** /**
* Cancel a delegation token * Cancel a delegation token
* @param token the token to cancel * @param token the token to cancel
@ -1572,7 +1664,7 @@ public class DFSClient implements java.io.Closeable {
synchronized List<LocatedBlock> getAllBlocks() throws IOException { synchronized List<LocatedBlock> getAllBlocks() throws IOException {
return ((DFSInputStream)in).getAllBlocks(); return ((DFSInputStream)in).getAllBlocks();
} }
/** /**
* @return The visible length of the file. * @return The visible length of the file.
*/ */
@ -1580,6 +1672,14 @@ public class DFSClient implements java.io.Closeable {
return ((DFSInputStream)in).getFileLength(); return ((DFSInputStream)in).getFileLength();
} }
} }
boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
throws IOException {
if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
return true;
}
return false;
}
void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) { void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {
DatanodeInfo [] dnArr = { dn }; DatanodeInfo [] dnArr = { dn };
@ -1602,4 +1702,8 @@ public class DFSClient implements java.io.Closeable {
return getClass().getSimpleName() + "[clientName=" + clientName return getClass().getSimpleName() + "[clientName=" + clientName
+ ", ugi=" + ugi + "]"; + ", ugi=" + ugi + "]";
} }
void disableShortCircuit() {
shortCircuitLocalReads = false;
}
} }

View File

@ -261,6 +261,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max"; public static final String DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY = "dfs.corruptfilesreturned.max";
public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500; public static final int DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED = 500;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_KEY = "dfs.client.read.shortcircuit";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
// property for fsimage compression // property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@ -301,4 +305,5 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes"; public static final String DFS_NAMENODE_CHECKED_VOLUMES_KEY = "dfs.namenode.resource.checked.volumes";
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal"; public static final String DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY = "dfs.web.authentication.kerberos.principal";
public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab"; public static final String DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY = "dfs.web.authentication.kerberos.keytab";
public static final String DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY = "dfs.block.local-path-access.user";
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
/**************************************************************** /****************************************************************
@ -405,11 +406,8 @@ public class DFSInputStream extends FSInputStream {
try { try {
ExtendedBlock blk = targetBlock.getBlock(); ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
blockReader = getBlockReader( accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
targetAddr, src, blk,
accessToken,
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName); buffersize, verifyChecksum, dfsClient.clientName);
if(connectFailedOnce) { if(connectFailedOnce) {
DFSClient.LOG.info("Successfully connected to " + targetAddr + DFSClient.LOG.info("Successfully connected to " + targetAddr +
@ -666,12 +664,9 @@ public class DFSInputStream extends FSInputStream {
Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1); int len = (int) (end - start + 1);
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
reader = getBlockReader(targetAddr, src, blockToken, start, len, buffersize, verifyChecksum,
block.getBlock(), dfsClient.clientName);
blockToken,
start, len, buffersize,
verifyChecksum, dfsClient.clientName);
int nread = reader.readAll(buf, offset, len); int nread = reader.readAll(buf, offset, len);
if (nread != len) { if (nread != len) {
throw new IOException("truncated return from reader.read(): " + throw new IOException("truncated return from reader.read(): " +
@ -684,6 +679,10 @@ public class DFSInputStream extends FSInputStream {
e.getPos() + " from " + chosenNode.getName()); e.getPos() + " from " + chosenNode.getName());
// we want to remember what we have tried // we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed ", ex);
dfsClient.disableShortCircuit();
continue;
} catch (IOException e) { } catch (IOException e) {
if (e instanceof InvalidBlockTokenException && refetchToken > 0) { if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
DFSClient.LOG.info("Will get a new access token and retry, " DFSClient.LOG.info("Will get a new access token and retry, "
@ -726,6 +725,7 @@ public class DFSInputStream extends FSInputStream {
* Otherwise, it will create a new connection. * Otherwise, it will create a new connection.
* *
* @param dnAddr Address of the datanode * @param dnAddr Address of the datanode
* @param chosenNode Chosen datanode information
* @param file File location * @param file File location
* @param block The Block object * @param block The Block object
* @param blockToken The access token for security * @param blockToken The access token for security
@ -737,6 +737,7 @@ public class DFSInputStream extends FSInputStream {
* @return New BlockReader instance * @return New BlockReader instance
*/ */
protected BlockReader getBlockReader(InetSocketAddress dnAddr, protected BlockReader getBlockReader(InetSocketAddress dnAddr,
DatanodeInfo chosenNode,
String file, String file,
ExtendedBlock block, ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken, Token<BlockTokenIdentifier> blockToken,
@ -746,6 +747,12 @@ public class DFSInputStream extends FSInputStream {
boolean verifyChecksum, boolean verifyChecksum,
String clientName) String clientName)
throws IOException { throws IOException {
if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset);
}
IOException err = null; IOException err = null;
boolean fromCache = true; boolean fromCache = true;

View File

@ -658,7 +658,22 @@ public class DFSUtil {
return (ClientProtocol) RetryProxy.create(ClientProtocol.class, return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
rpcNamenode, methodNameToPolicyMap); rpcNamenode, methodNameToPolicyMap);
} }
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout)
throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getHost()
+ ":" + datanodeid.getIpcPort());
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
}
return (ClientDatanodeProtocol) RPC.getProxy(ClientDatanodeProtocol.class,
ClientDatanodeProtocol.versionID, addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
/** Create a {@link ClientDatanodeProtocol} proxy */ /** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout, DatanodeID datanodeid, Configuration conf, int socketTimeout,

View File

@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private ReadableByteChannel in; private ReadableByteChannel in;
private DataChecksum checksum; protected DataChecksum checksum;
private PacketHeader curHeader; private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null; private ByteBuffer curPacketBuf = null;
@ -96,25 +96,25 @@ public class RemoteBlockReader2 implements BlockReader {
private long lastSeqNo = -1; private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */ /** offset in block where reader wants to actually read */
private long startOffset; protected long startOffset;
private final String filename; protected final String filename;
private static DirectBufferPool bufferPool = protected static DirectBufferPool bufferPool =
new DirectBufferPool(); new DirectBufferPool();
private ByteBuffer headerBuf = ByteBuffer.allocate( private ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN); PacketHeader.PKT_HEADER_LEN);
private int bytesPerChecksum; protected int bytesPerChecksum;
private int checksumSize; protected int checksumSize;
/** /**
* The total number of bytes we need to transfer from the DN. * The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding * This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary. * at the beginning so that the read can begin on a chunk boundary.
*/ */
private long bytesNeededToFinish; protected long bytesNeededToFinish;
private final boolean verifyChecksum; protected final boolean verifyChecksum;
private boolean sentStatusCode = false; private boolean sentStatusCode = false;
@ -271,7 +271,7 @@ public class RemoteBlockReader2 implements BlockReader {
} }
} }
private RemoteBlockReader2(String file, String bpid, long blockId, protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum, ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) { long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
// Path is used only for printing block and file information in debug // Path is used only for printing block and file information in debug

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
/**
* A block and the full path information to the block data file and
* the metadata file stored on the local file system.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockLocalPathInfo implements Writable {
static final WritableFactory FACTORY = new WritableFactory() {
public Writable newInstance() { return new BlockLocalPathInfo(); }
};
static { // register a ctor
WritableFactories.setFactory(BlockLocalPathInfo.class, FACTORY);
}
private ExtendedBlock block;
private String localBlockPath = ""; // local file storing the data
private String localMetaPath = ""; // local file storing the checksum
public BlockLocalPathInfo() {}
/**
* Constructs BlockLocalPathInfo.
* @param b The block corresponding to this lock path info.
* @param file Block data file.
* @param metafile Metadata file for the block.
*/
public BlockLocalPathInfo(ExtendedBlock b, String file, String metafile) {
block = b;
localBlockPath = file;
localMetaPath = metafile;
}
/**
* Get the Block data file.
* @return Block data file.
*/
public String getBlockPath() {return localBlockPath;}
/**
* Get the Block metadata file.
* @return Block metadata file.
*/
public String getMetaPath() {return localMetaPath;}
@Override
public void write(DataOutput out) throws IOException {
block.write(out);
Text.writeString(out, localBlockPath);
Text.writeString(out, localMetaPath);
}
@Override
public void readFields(DataInput in) throws IOException {
block = new ExtendedBlock();
block.readFields(in);
localBlockPath = Text.readString(in);
localMetaPath = Text.readString(in);
}
/**
* Get number of bytes in the block.
* @return Number of bytes in the block.
*/
public long getNumBytes() {
return block.getNumBytes();
}
}

View File

@ -24,9 +24,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
/** An client-datanode protocol for block recovery /** An client-datanode protocol for block recovery
@ -65,5 +67,30 @@ public interface ClientDatanodeProtocol extends VersionedProtocol {
* deleted along with its contents. * deleted along with its contents.
* @throws IOException * @throws IOException
*/ */
void deleteBlockPool(String bpid, boolean force) throws IOException; void deleteBlockPool(String bpid, boolean force) throws IOException;
/**
* Retrieves the path names of the block file and metadata file stored on the
* local file system.
*
* In order for this method to work, one of the following should be satisfied:
* <ul>
* <li>
* The client user must be configured at the datanode to be able to use this
* method.</li>
* <li>
* When security is enabled, kerberos authentication must be used to connect
* to the datanode.</li>
* </ul>
*
* @param block
* the specified block on the local datanode
* @param token
* the block access token.
* @return the BlockLocalPathInfo of a block
* @throws IOException
* on error
*/
BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException;
} }

View File

@ -28,6 +28,9 @@ import java.io.RandomAccessFile;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** /**
@ -35,7 +38,9 @@ import org.apache.hadoop.util.DataChecksum;
* This is not related to the Block related functionality in Namenode. * This is not related to the Block related functionality in Namenode.
* The biggest part of data block metadata is CRC for the block. * The biggest part of data block metadata is CRC for the block.
*/ */
class BlockMetadataHeader { @InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockMetadataHeader {
static final short METADATA_VERSION = FSDataset.METADATA_VERSION; static final short METADATA_VERSION = FSDataset.METADATA_VERSION;
@ -51,12 +56,14 @@ class BlockMetadataHeader {
this.checksum = checksum; this.checksum = checksum;
this.version = version; this.version = version;
} }
short getVersion() { /** Get the version */
public short getVersion() {
return version; return version;
} }
DataChecksum getChecksum() { /** Get the checksum */
public DataChecksum getChecksum() {
return checksum; return checksum;
} }
@ -67,7 +74,7 @@ class BlockMetadataHeader {
* @return Metadata Header * @return Metadata Header
* @throws IOException * @throws IOException
*/ */
static BlockMetadataHeader readHeader(DataInputStream in) throws IOException { public static BlockMetadataHeader readHeader(DataInputStream in) throws IOException {
return readHeader(in.readShort(), in); return readHeader(in.readShort(), in);
} }

View File

@ -52,6 +52,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File; import java.io.File;
@ -87,6 +88,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -107,6 +109,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@ -127,6 +130,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.http.HttpServer;
@ -139,8 +143,10 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
@ -393,6 +399,8 @@ public class DataNode extends Configured
private AbstractList<File> dataDirs; private AbstractList<File> dataDirs;
private Configuration conf; private Configuration conf;
private final String userWithLocalPathAccess;
/** /**
* Create the DataNode given a configuration and an array of dataDirs. * Create the DataNode given a configuration and an array of dataDirs.
* 'dataDirs' is where the blocks are stored. * 'dataDirs' is where the blocks are stored.
@ -411,6 +419,8 @@ public class DataNode extends Configured
final SecureResources resources) throws IOException { final SecureResources resources) throws IOException {
super(conf); super(conf);
this.userWithLocalPathAccess = conf
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try { try {
hostName = getHostName(conf); hostName = getHostName(conf);
startDataNode(conf, dataDirs, resources); startDataNode(conf, dataDirs, resources);
@ -1051,6 +1061,68 @@ public class DataNode extends Configured
return "DS-" + rand + "-" + ip + "-" + port + "-" return "DS-" + rand + "-" + ip + "-" + port + "-"
+ System.currentTimeMillis(); + System.currentTimeMillis();
} }
/** Ensure the authentication method is kerberos */
private void checkKerberosAuthMethod(String msg) throws IOException {
// User invoking the call must be same as the datanode user
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
if (UserGroupInformation.getCurrentUser().getAuthenticationMethod() !=
AuthenticationMethod.KERBEROS) {
throw new AccessControlException("Error in " + msg
+ "Only kerberos based authentication is allowed.");
}
}
private void checkBlockLocalPathAccess() throws IOException {
checkKerberosAuthMethod("getBlockLocalPathInfo()");
String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
if (!currentUser.equals(this.userWithLocalPathAccess)) {
throw new AccessControlException(
"Can't continue with getBlockLocalPathInfo() "
+ "authorization. The user " + currentUser
+ " is not allowed to call getBlockLocalPathInfo");
}
}
@Override
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException {
checkBlockLocalPathAccess();
checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
if (LOG.isDebugEnabled()) {
if (info != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("getBlockLocalPathInfo successful block=" + block
+ " blockfile " + info.getBlockPath() + " metafile "
+ info.getMetaPath());
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("getBlockLocalPathInfo for block=" + block
+ " returning null");
}
}
}
metrics.incrBlocksGetLocalPathInfo();
return info;
}
private void checkBlockToken(ExtendedBlock block, Token<BlockTokenIdentifier> token,
AccessMode accessMode) throws IOException {
if (isBlockTokenEnabled && UserGroupInformation.isSecurityEnabled()) {
BlockTokenIdentifier id = new BlockTokenIdentifier();
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
id.readFields(in);
if (LOG.isDebugEnabled()) {
LOG.debug("Got: " + id.toString());
}
blockPoolTokenSecretManager.checkAccess(id, null, block, accessMode);
}
}
/** /**
* Shut down this instance of the datanode. * Shut down this instance of the datanode.

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@ -2655,4 +2656,14 @@ public class FSDataset implements FSDatasetInterface {
volume.deleteBPDirectories(bpid, force); volume.deleteBPDirectories(bpid, force);
} }
} }
@Override // FSDatasetInterface
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
File datafile = getBlockFile(block);
File metafile = getMetaFile(datafile, block.getGenerationStamp());
BlockLocalPathInfo info = new BlockLocalPathInfo(block,
datafile.getAbsolutePath(), metafile.getAbsolutePath());
return info;
}
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.Closeable; import java.io.Closeable;
import java.io.File;
import java.io.FilterInputStream; import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -402,4 +404,9 @@ public interface FSDatasetInterface extends FSDatasetMBean {
* @throws IOException * @throws IOException
*/ */
public void deleteBlockPool(String bpid, boolean force) throws IOException; public void deleteBlockPool(String bpid, boolean force) throws IOException;
/**
* Get {@link BlockLocalPathInfo} for the given block.
**/
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b) throws IOException;
} }

View File

@ -60,6 +60,7 @@ public class DataNodeMetrics {
@Metric MutableCounterLong readsFromRemoteClient; @Metric MutableCounterLong readsFromRemoteClient;
@Metric MutableCounterLong writesFromLocalClient; @Metric MutableCounterLong writesFromLocalClient;
@Metric MutableCounterLong writesFromRemoteClient; @Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
@Metric MutableCounterLong volumeFailures; @Metric MutableCounterLong volumeFailures;
@ -165,4 +166,9 @@ public class DataNodeMetrics {
public void incrVolumeFailures() { public void incrVolumeFailures() {
volumeFailures.incr(); volumeFailures.incr();
} }
/** Increment for getBlockLocalPathInfo calls */
public void incrBlocksGetLocalPathInfo() {
blocksGetLocalPathInfo.incr();
}
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.junit.Test; import org.junit.Test;
@ -209,6 +210,7 @@ public class TestConnCache {
MockGetBlockReader answer = new MockGetBlockReader(); MockGetBlockReader answer = new MockGetBlockReader();
Mockito.doAnswer(answer).when(in).getBlockReader( Mockito.doAnswer(answer).when(in).getBlockReader(
(InetSocketAddress) Matchers.anyObject(), (InetSocketAddress) Matchers.anyObject(),
(DatanodeInfo) Matchers.anyObject(),
Matchers.anyString(), Matchers.anyString(),
(ExtendedBlock) Matchers.anyObject(), (ExtendedBlock) Matchers.anyObject(),
(Token<BlockTokenIdentifier>) Matchers.anyObject(), (Token<BlockTokenIdentifier>) Matchers.anyObject(),

View File

@ -0,0 +1,304 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
/**
* Test for short circuit read functionality using {@link BlockReaderLocal}.
* When a block is being read by a client is on the local datanode, instead of
* using {@link DataTransferProtocol} and connect to datanode, the short circuit
* read allows reading the file directly from the files on the local file
* system.
*/
public class TestShortCircuitLocalRead {
static final String DIR = "/" + TestShortCircuitLocalRead.class.getSimpleName() + "/";
static final long seed = 0xDEADBEEFL;
static final int blockSize = 5120;
boolean simulatedStorage = false;
// creates a file but does not close it
static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
throws IOException {
FSDataOutputStream stm = fileSys.create(name, true,
fileSys.getConf().getInt("io.file.buffer.size", 4096),
(short)repl, (long)blockSize);
return stm;
}
static private void checkData(byte[] actual, int from, byte[] expected,
String message) {
checkData(actual, from, expected, actual.length, message);
}
static private void checkData(byte[] actual, int from, byte[] expected, int len,
String message) {
for (int idx = 0; idx < len; idx++) {
if (expected[from + idx] != actual[idx]) {
Assert.fail(message + " byte " + (from + idx) + " differs. expected "
+ expected[from + idx] + " actual " + actual[idx]);
}
}
}
static void checkFileContent(FileSystem fs, Path name, byte[] expected,
int readOffset) throws IOException {
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[expected.length-readOffset];
stm.readFully(readOffset, actual);
checkData(actual, readOffset, expected, "Read 2");
stm.close();
// Now read using a different API.
actual = new byte[expected.length-readOffset];
stm = fs.open(name);
long skipped = stm.skip(readOffset);
Assert.assertEquals(skipped, readOffset);
//Read a small number of bytes first.
int nread = stm.read(actual, 0, 3);
nread += stm.read(actual, nread, 2);
//Read across chunk boundary
nread += stm.read(actual, nread, 517);
checkData(actual, readOffset, expected, nread, "A few bytes");
//Now read rest of it
while (nread < actual.length) {
int nbytes = stm.read(actual, nread, actual.length - nread);
if (nbytes < 0) {
throw new EOFException("End of file reached before reading fully.");
}
nread += nbytes;
}
checkData(actual, readOffset, expected, "Read 3");
stm.close();
}
/**
* Test that file data can be read by reading the block file
* directly from the local store.
*/
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
int readOffset) throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
if (simulatedStorage) {
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
}
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
FileSystem fs = cluster.getFileSystem();
try {
// check that / exists
Path path = new Path("/");
assertTrue("/ should be a directory", fs.getFileStatus(path)
.isDirectory() == true);
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
// create a new file in home directory. Do not close it.
Path file1 = new Path("filelocal.dat");
FSDataOutputStream stm = createFile(fs, file1, 1);
// write to file
stm.write(fileData);
stm.close();
checkFileContent(fs, file1, fileData, readOffset);
} finally {
fs.close();
cluster.shutdown();
}
}
@Test
public void testFileLocalReadNoChecksum() throws IOException {
doTestShortCircuitRead(true, 3*blockSize+100, 0);
}
@Test
public void testFileLocalReadChecksum() throws IOException {
doTestShortCircuitRead(false, 3*blockSize+100, 0);
}
@Test
public void testSmallFileLocalRead() throws IOException {
doTestShortCircuitRead(false, 13, 0);
doTestShortCircuitRead(false, 13, 5);
doTestShortCircuitRead(true, 13, 0);
doTestShortCircuitRead(true, 13, 5);
}
@Test
public void testReadFromAnOffset() throws IOException {
doTestShortCircuitRead(false, 3*blockSize+100, 777);
doTestShortCircuitRead(true, 3*blockSize+100, 777);
}
@Test
public void testLongFile() throws IOException {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
@Test
public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
final Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, "alloweduser");
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.format(true).build();
cluster.waitActive();
final DataNode dn = cluster.getDataNodes().get(0);
FileSystem fs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(fs, new Path("/tmp/x"), 16, (short) 1, 23);
UserGroupInformation aUgi = UserGroupInformation
.createRemoteUser("alloweduser");
LocatedBlocks lb = cluster.getNameNode().getRpcServer()
.getBlockLocations("/tmp/x", 0, 16);
// Create a new block object, because the block inside LocatedBlock at
// namenode is of type BlockInfo.
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
ClientDatanodeProtocol proxy = aUgi
.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000);
}
});
//This should succeed
BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
Assert.assertEquals(dn.data.getBlockLocalPathInfo(blk).getBlockPath(),
blpi.getBlockPath());
// Now try with a not allowed user.
UserGroupInformation bUgi = UserGroupInformation
.createRemoteUser("notalloweduser");
proxy = bUgi
.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000);
}
});
try {
proxy.getBlockLocalPathInfo(blk, token);
Assert.fail("The call should have failed as " + bUgi.getShortUserName()
+ " is not allowed to call getBlockLocalPathInfo");
} catch (IOException ex) {
Assert.assertTrue(ex.getMessage().contains(
"not allowed to call getBlockLocalPathInfo"));
}
} finally {
fs.close();
cluster.shutdown();
}
}
/**
* Test to run benchmarks between shortcircuit read vs regular read with
* specified number of threads simultaneously reading.
* <br>
* Run this using the following command:
* bin/hadoop --config confdir \
* org.apache.hadoop.hdfs.TestShortCircuitLocalRead \
* <shortcircuit on?> <checsum on?> <Number of threads>
*/
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.out.println("Usage: test shortcircuit checksum threadCount");
System.exit(1);
}
boolean shortcircuit = Boolean.valueOf(args[0]);
boolean checksum = Boolean.valueOf(args[1]);
int threadCount = Integer.valueOf(args[2]);
// Setup create a file
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
checksum);
//Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
// create a new file in home directory. Do not close it.
final Path file1 = new Path("filelocal.dat");
final FileSystem fs = FileSystem.get(conf);
FSDataOutputStream stm = createFile(fs, file1, 1);
stm.write(dataToWrite);
stm.close();
long start = System.currentTimeMillis();
final int iteration = 20;
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread() {
public void run() {
for (int i = 0; i < iteration; i++) {
try {
checkFileContent(fs, file1, dataToWrite, 0);
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
}
for (int i = 0; i < threadCount; i++) {
threads[i].start();
}
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
long end = System.currentTimeMillis();
System.out.println("Iteration " + iteration + " took " + (end - start));
fs.delete(file1, false);
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice; import org.apache.hadoop.hdfs.server.datanode.FSDataset.BlockPoolSlice;
@ -991,4 +992,10 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
} }
return r; return r;
} }
@Override
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b)
throws IOException {
throw new IOException("getBlockLocalPathInfo not supported.");
}
} }