HDFS-4595. Merging 1456047 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1456048 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-03-13 16:54:10 +00:00
parent 7f75863029
commit e81ffd1705
6 changed files with 165 additions and 83 deletions

View File

@ -74,6 +74,9 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4484. libwebhdfs compilation broken with gcc 4.6.2. (Colin Patrick
McCabe via atm)
HDFS-4595. When short circuit read is fails, DFSClient does not fallback
to regular reads. (suresh)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -23,6 +23,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -31,6 +32,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -86,11 +89,21 @@ class BlockReaderLocal implements BlockReader {
}
private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname) throws IOException {
UserGroupInformation ugi, final DatanodeInfo node,
final Configuration conf, final int socketTimeout,
final boolean connectToDnViaHostname) throws IOException {
if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout, connectToDnViaHostname);
try {
proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout, connectToDnViaHostname);
}
});
} catch (InterruptedException e) {
LOG.warn("encountered exception ", e);
}
}
return proxy;
}
@ -154,17 +167,18 @@ class BlockReaderLocal implements BlockReader {
/**
* The only way this object can be instantiated.
*/
static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length,
boolean connectToDnViaHostname) throws IOException {
static BlockReaderLocal newBlockReader(UserGroupInformation ugi,
Configuration conf, String file, ExtendedBlock blk,
Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout,
long startOffset, long length, boolean connectToDnViaHostname)
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
pathinfo = getBlockPathInfo(ugi, blk, node, conf, socketTimeout, token,
connectToDnViaHostname);
}
@ -241,13 +255,13 @@ class BlockReaderLocal implements BlockReader {
return ldInfo;
}
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout,
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
throws IOException {
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
conf, timeout, connectToDnViaHostname);
try {
// make RPC to local datanode to find local pathnames of blocks

View File

@ -414,6 +414,7 @@ public class DFSClient implements java.io.Closeable {
"null URI");
NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
@ -782,12 +783,13 @@ public class DFSClient implements java.io.Closeable {
/**
* Get {@link BlockReader} for short circuited local reads.
*/
static BlockReader getLocalBlockReader(Configuration conf,
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
boolean connectToDnViaHostname) throws InvalidToken, IOException {
static BlockReader getLocalBlockReader(UserGroupInformation ugi,
Configuration conf, String src, ExtendedBlock blk,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo chosenNode,
int socketTimeout, long offsetIntoBlock, boolean connectToDnViaHostname)
throws InvalidToken, IOException {
try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
return BlockReaderLocal.newBlockReader(ugi, conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) {
@ -1638,7 +1640,7 @@ public class DFSClient implements java.io.Closeable {
* @param socketFactory to create sockets to connect to DNs
* @param socketTimeout timeout to use when connecting and waiting for a response
* @param encryptionKey the key needed to communicate with DNs in this cluster
* @param connectToDnViaHostname {@see #connectToDnViaHostname()}
* @param connectToDnViaHostname {@link #connectToDnViaHostname()}
* @return The checksum
*/
static MD5MD5CRC32FileChecksum getFileChecksum(String src,
@ -2250,6 +2252,12 @@ public class DFSClient implements java.io.Closeable {
}
void disableShortCircuit() {
LOG.info("Short circuit is disabled");
shortCircuitLocalReads = false;
}
@VisibleForTesting
boolean getShortCircuitLocalReads() {
return shortCircuitLocalReads;
}
}

View File

@ -460,6 +460,10 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
" for " + blk);
}
return chosenNode;
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed " + ex);
dfsClient.disableShortCircuit();
continue;
} catch (IOException ex) {
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
@ -806,7 +810,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
} catch (AccessControlException ex) {
DFSClient.LOG.warn("Short circuit access failed ", ex);
DFSClient.LOG.warn("Short circuit access failed " + ex);
dfsClient.disableShortCircuit();
continue;
} catch (IOException e) {
@ -885,9 +889,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
// Can't local read a block under construction, see HDFS-2757
if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
!blockUnderConstruction()) {
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
dfsClient.connectToDnViaHostname());
return DFSClient.getLocalBlockReader(dfsClient.ugi, dfsClient.conf,
src, block, blockToken, chosenNode, dfsClient.hdfsTimeout,
startOffset, dfsClient.connectToDnViaHostname());
}
IOException err = null;
@ -1027,8 +1031,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
* only report if the total number of replica is 1. We do not
* report otherwise since this maybe due to the client is a handicapped client
* (who can not read).
* @param corruptedBlockMap, map of corrupted blocks
* @param dataNodeCount, number of data nodes who contains the block replicas
* @param corruptedBlockMap map of corrupted blocks
* @param dataNodeCount number of data nodes who contains the block replicas
*/
private void reportCheckSumFailure(
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,

View File

@ -67,6 +67,8 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
@ -564,9 +566,8 @@ public class DistributedFileSystem extends FileSystem {
return "DFS[" + dfs + "]";
}
/** @deprecated DFSClient should not be accessed directly. */
@InterfaceAudience.Private
@Deprecated
@VisibleForTesting
public DFSClient getClient() {
return dfs;
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
@ -32,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -85,9 +88,20 @@ public class TestShortCircuitLocalRead {
}
}
}
private static String getCurrentUser() throws IOException {
return UserGroupInformation.getCurrentUser().getShortUserName();
}
static void checkFileContent(FileSystem fs, Path name, byte[] expected,
int readOffset) throws IOException {
/** Check file content, reading as user {@code readingUser} */
static void checkFileContent(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
boolean shortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
assertTrue(fs.getClient().getShortCircuitLocalReads());
FSDataInputStream stm = fs.open(name);
byte[] actual = new byte[expected.length-readOffset];
stm.readFully(readOffset, actual);
@ -112,6 +126,11 @@ public class TestShortCircuitLocalRead {
nread += nbytes;
}
checkData(actual, readOffset, expected, "Read 3");
if (shortCircuitFails) {
// short circuit should be disabled due to failure
assertFalse(fs.getClient().getShortCircuitLocalReads());
}
stm.close();
}
@ -123,11 +142,15 @@ public class TestShortCircuitLocalRead {
return arr;
}
/**
* Verifies that reading a file with the direct read(ByteBuffer) api gives the expected set of bytes.
*/
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
int readOffset) throws IOException {
/** Check the file content, reading as user {@code readingUser} */
static void checkFileContentDirect(URI uri, Path name, byte[] expected,
int readOffset, String readingUser, Configuration conf,
boolean shortCircuitFails)
throws IOException, InterruptedException {
// Ensure short circuit is enabled
DistributedFileSystem fs = getFileSystem(readingUser, uri, conf);
assertTrue(fs.getClient().getShortCircuitLocalReads());
DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
@ -157,21 +180,33 @@ public class TestShortCircuitLocalRead {
nread += nbytes;
}
checkData(arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
if (shortCircuitFails) {
// short circuit should be disabled due to failure
assertFalse(fs.getClient().getShortCircuitLocalReads());
}
stm.close();
}
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
int readOffset) throws IOException, InterruptedException {
String shortCircuitUser = getCurrentUser();
doTestShortCircuitRead(ignoreChecksum, size, readOffset, shortCircuitUser,
shortCircuitUser, false);
}
/**
* Test that file data can be read by reading the block file
* directly from the local store.
*/
public void doTestShortCircuitRead(boolean ignoreChecksum, int size,
int readOffset) throws IOException {
int readOffset, String shortCircuitUser, String readingUser,
boolean shortCircuitFails) throws IOException, InterruptedException {
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
ignoreChecksum);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
shortCircuitUser);
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@ -184,53 +219,88 @@ public class TestShortCircuitLocalRead {
assertTrue("/ should be a directory", fs.getFileStatus(path)
.isDirectory() == true);
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
// create a new file in home directory. Do not close it.
Path file1 = new Path("filelocal.dat");
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
FSDataOutputStream stm = createFile(fs, file1, 1);
// write to file
stm.write(fileData);
stm.close();
checkFileContent(fs, file1, fileData, readOffset);
checkFileContentDirect(fs, file1, fileData, readOffset);
URI uri = cluster.getURI();
checkFileContent(uri, file1, fileData, readOffset, readingUser, conf,
shortCircuitFails);
checkFileContentDirect(uri, file1, fileData, readOffset, readingUser,
conf, shortCircuitFails);
} finally {
fs.close();
cluster.shutdown();
}
}
@Test
public void testFileLocalReadNoChecksum() throws IOException {
@Test(timeout=10000)
public void testFileLocalReadNoChecksum() throws Exception {
doTestShortCircuitRead(true, 3*blockSize+100, 0);
}
@Test
public void testFileLocalReadChecksum() throws IOException {
@Test(timeout=10000)
public void testFileLocalReadChecksum() throws Exception {
doTestShortCircuitRead(false, 3*blockSize+100, 0);
}
@Test
public void testSmallFileLocalRead() throws IOException {
@Test(timeout=10000)
public void testSmallFileLocalRead() throws Exception {
doTestShortCircuitRead(false, 13, 0);
doTestShortCircuitRead(false, 13, 5);
doTestShortCircuitRead(true, 13, 0);
doTestShortCircuitRead(true, 13, 5);
}
@Test
public void testReadFromAnOffset() throws IOException {
/**
* Try a short circuit from a reader that is not allowed to
* to use short circuit. The test ensures reader falls back to non
* shortcircuit reads when shortcircuit is disallowed.
*/
@Test(timeout=10000)
public void testLocalReadFallback() throws Exception {
doTestShortCircuitRead(true, 13, 0, getCurrentUser(), "notallowed", true);
}
@Test(timeout=10000)
public void testReadFromAnOffset() throws Exception {
doTestShortCircuitRead(false, 3*blockSize+100, 777);
doTestShortCircuitRead(true, 3*blockSize+100, 777);
}
@Test
public void testLongFile() throws IOException {
@Test(timeout=10000)
public void testLongFile() throws Exception {
doTestShortCircuitRead(false, 10*blockSize+100, 777);
doTestShortCircuitRead(true, 10*blockSize+100, 777);
}
@Test
private ClientDatanodeProtocol getProxy(UserGroupInformation ugi,
final DatanodeID dnInfo, final Configuration conf) throws IOException,
InterruptedException {
return ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000,
false);
}
});
}
private static DistributedFileSystem getFileSystem(String user, final URI uri,
final Configuration conf) throws InterruptedException, IOException {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
return ugi.doAs(new PrivilegedExceptionAction<DistributedFileSystem>() {
@Override
public DistributedFileSystem run() throws Exception {
return (DistributedFileSystem)FileSystem.get(uri, conf);
}
});
}
@Test(timeout=10000)
public void testGetBlockLocalPathInfo() throws IOException, InterruptedException {
final Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
@ -253,15 +323,7 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
ClientDatanodeProtocol proxy = aUgi1
.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000, false);
}
});
ClientDatanodeProtocol proxy = getProxy(aUgi1, dnInfo, conf);
// This should succeed
BlockLocalPathInfo blpi = proxy.getBlockLocalPathInfo(blk, token);
Assert.assertEquals(
@ -269,14 +331,7 @@ public class TestShortCircuitLocalRead {
blpi.getBlockPath());
// Try with the other allowed user
proxy = aUgi2
.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000, false);
}
});
proxy = getProxy(aUgi2, dnInfo, conf);
// This should succeed as well
blpi = proxy.getBlockLocalPathInfo(blk, token);
@ -287,14 +342,7 @@ public class TestShortCircuitLocalRead {
// Now try with a disallowed user
UserGroupInformation bUgi = UserGroupInformation
.createRemoteUser("notalloweduser");
proxy = bUgi
.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override
public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000, false);
}
});
proxy = getProxy(bUgi, dnInfo, conf);
try {
proxy.getBlockLocalPathInfo(blk, token);
Assert.fail("The call should have failed as " + bUgi.getShortUserName()
@ -309,14 +357,14 @@ public class TestShortCircuitLocalRead {
}
}
@Test
@Test(timeout=10000)
public void testSkipWithVerifyChecksum() throws IOException {
int size = blockSize;
Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
getCurrentUser());
if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf);
}
@ -356,7 +404,7 @@ public class TestShortCircuitLocalRead {
}
/**
* Test to run benchmarks between shortcircuit read vs regular read with
* Test to run benchmarks between short circuit read vs regular read with
* specified number of threads simultaneously reading.
* <br>
* Run this using the following command:
@ -374,7 +422,7 @@ public class TestShortCircuitLocalRead {
int threadCount = Integer.valueOf(args[2]);
// Setup create a file
Configuration conf = new Configuration();
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, shortcircuit);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
checksum);
@ -400,9 +448,13 @@ public class TestShortCircuitLocalRead {
public void run() {
for (int i = 0; i < iteration; i++) {
try {
checkFileContent(fs, file1, dataToWrite, 0);
String user = getCurrentUser();
checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf,
true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}