HDFS-9686. Remove useless boxing/unboxing code. Contributed by Kousuke Saruta.

(cherry picked from commit fe124da5ff)
(cherry picked from commit e4c01b8b1c)
This commit is contained in:
Akira Ajisaka 2016-02-06 19:31:01 +09:00
parent be115634f2
commit 3f8a30c956
6 changed files with 145 additions and 53 deletions

View File

@ -36,7 +36,9 @@ import java.text.MessageFormat;
* and uses its lifecycle to start and stop the server. * and uses its lifecycle to start and stop the server.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class ServerWebApp extends Server implements ServletContextListener { public abstract class ServerWebApp
extends Server
implements ServletContextListener {
private static final String HOME_DIR = ".home.dir"; private static final String HOME_DIR = ".home.dir";
private static final String CONFIG_DIR = ".config.dir"; private static final String CONFIG_DIR = ".config.dir";
@ -61,8 +63,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
/** /**
* Constructor for testing purposes. * Constructor for testing purposes.
*/ */
protected ServerWebApp(String name, String homeDir, String configDir, String logDir, String tempDir, protected ServerWebApp(String name, String homeDir, String configDir,
Configuration config) { String logDir, String tempDir, Configuration config) {
super(name, homeDir, configDir, logDir, tempDir, config); super(name, homeDir, configDir, logDir, tempDir, config);
} }
@ -120,7 +122,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
String sysProp = name + HOME_DIR; String sysProp = name + HOME_DIR;
homeDir = System.getProperty(sysProp); homeDir = System.getProperty(sysProp);
if (homeDir == null) { if (homeDir == null) {
throw new IllegalArgumentException(MessageFormat.format("System property [{0}] not defined", sysProp)); throw new IllegalArgumentException(MessageFormat.format(
"System property [{0}] not defined", sysProp));
} }
} }
return homeDir; return homeDir;
@ -160,7 +163,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
} }
/** /**
* Resolves the host and port InetSocketAddress the web server is listening to. * Resolves the host and port InetSocketAddress the
* web server is listening to.
* <p> * <p>
* This implementation looks for the following 2 properties: * This implementation looks for the following 2 properties:
* <ul> * <ul>
@ -168,8 +172,10 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
* <li>#SERVER_NAME#.http.port</li> * <li>#SERVER_NAME#.http.port</li>
* </ul> * </ul>
* *
* @return the host and port InetSocketAddress the web server is listening to. * @return the host and port InetSocketAddress the
* @throws ServerException thrown if any of the above 2 properties is not defined. * web server is listening to.
* @throws ServerException thrown
* if any of the above 2 properties is not defined.
*/ */
protected InetSocketAddress resolveAuthority() throws ServerException { protected InetSocketAddress resolveAuthority() throws ServerException {
String hostnameKey = getName() + HTTP_HOSTNAME; String hostnameKey = getName() + HTTP_HOSTNAME;
@ -233,6 +239,7 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
* *
*/ */
public boolean isSslEnabled() { public boolean isSslEnabled() {
return Boolean.valueOf(System.getProperty(getName() + SSL_ENABLED, "false")); return Boolean.parseBoolean(
System.getProperty(getName() + SSL_ENABLED, "false"));
} }
} }

View File

@ -967,6 +967,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9726. Refactor IBR code to a new class. (szetszwo) HDFS-9726. Refactor IBR code to a new class. (szetszwo)
HDFS-9686. Remove useless boxing/unboxing code.
(Kousuke Saruta via aajisaka)
BUG FIXES BUG FIXES
HDFS-8091: ACLStatus and XAttributes should be presented to HDFS-8091: ACLStatus and XAttributes should be presented to

View File

@ -416,7 +416,9 @@ public abstract class FSEditLogOp {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp { static abstract class AddCloseOp
extends FSEditLogOp
implements BlockListUpdatingOp {
int length; int length;
long inodeId; long inodeId;
String path; String path;
@ -637,7 +639,8 @@ public abstract class FSEditLogOp {
NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) { NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
this.storagePolicyId = FSImageSerialization.readByte(in); this.storagePolicyId = FSImageSerialization.readByte(in);
} else { } else {
this.storagePolicyId = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED; this.storagePolicyId =
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
} }
// read clientId and callId // read clientId and callId
readRpcIds(in, logVersion); readRpcIds(in, logVersion);
@ -717,7 +720,7 @@ public abstract class FSEditLogOp {
Long.toString(inodeId)); Long.toString(inodeId));
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "REPLICATION", XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.valueOf(replication).toString()); Short.toString(replication));
XMLUtils.addSaxString(contentHandler, "MTIME", XMLUtils.addSaxString(contentHandler, "MTIME",
Long.toString(mtime)); Long.toString(mtime));
XMLUtils.addSaxString(contentHandler, "ATIME", XMLUtils.addSaxString(contentHandler, "ATIME",
@ -745,7 +748,7 @@ public abstract class FSEditLogOp {
this.length = Integer.parseInt(st.getValue("LENGTH")); this.length = Integer.parseInt(st.getValue("LENGTH"));
this.inodeId = Long.parseLong(st.getValue("INODEID")); this.inodeId = Long.parseLong(st.getValue("INODEID"));
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.replication = Short.valueOf(st.getValue("REPLICATION")); this.replication = Short.parseShort(st.getValue("REPLICATION"));
this.mtime = Long.parseLong(st.getValue("MTIME")); this.mtime = Long.parseLong(st.getValue("MTIME"));
this.atime = Long.parseLong(st.getValue("ATIME")); this.atime = Long.parseLong(st.getValue("ATIME"));
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE")); this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
@ -1186,12 +1189,12 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "REPLICATION", XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.valueOf(replication).toString()); Short.toString(replication));
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.replication = Short.valueOf(st.getValue("REPLICATION")); this.replication = Short.parseShort(st.getValue("REPLICATION"));
} }
} }
@ -1979,13 +1982,13 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src); XMLUtils.addSaxString(contentHandler, "SRC", src);
XMLUtils.addSaxString(contentHandler, "MODE", XMLUtils.addSaxString(contentHandler, "MODE",
Short.valueOf(permissions.toShort()).toString()); Short.toString(permissions.toShort()));
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
this.src = st.getValue("SRC"); this.src = st.getValue("SRC");
this.permissions = new FsPermission( this.permissions = new FsPermission(
Short.valueOf(st.getValue("MODE"))); Short.parseShort(st.getValue("MODE")));
} }
} }
@ -4469,13 +4472,13 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "POLICYID", XMLUtils.addSaxString(contentHandler, "POLICYID",
Byte.valueOf(policyId).toString()); Byte.toString(policyId));
} }
@Override @Override
void fromXml(Stanza st) throws InvalidXmlException { void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.policyId = Byte.valueOf(st.getValue("POLICYID")); this.policyId = Byte.parseByte(st.getValue("POLICYID"));
} }
} }
@ -4952,7 +4955,8 @@ public abstract class FSEditLogOp {
public static void delegationTokenToXml(ContentHandler contentHandler, public static void delegationTokenToXml(ContentHandler contentHandler,
DelegationTokenIdentifier token) throws SAXException { DelegationTokenIdentifier token) throws SAXException {
contentHandler.startElement("", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl()); contentHandler.startElement(
"", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString()); XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString());
XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER", XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER",
Integer.toString(token.getSequenceNumber())); Integer.toString(token.getSequenceNumber()));
@ -4998,7 +5002,8 @@ public abstract class FSEditLogOp {
public static void delegationKeyToXml(ContentHandler contentHandler, public static void delegationKeyToXml(ContentHandler contentHandler,
DelegationKey key) throws SAXException { DelegationKey key) throws SAXException {
contentHandler.startElement("", "", "DELEGATION_KEY", new AttributesImpl()); contentHandler.startElement(
"", "", "DELEGATION_KEY", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "KEY_ID", XMLUtils.addSaxString(contentHandler, "KEY_ID",
Integer.toString(key.getKeyId())); Integer.toString(key.getKeyId()));
XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE", XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE",
@ -5026,7 +5031,8 @@ public abstract class FSEditLogOp {
public static void permissionStatusToXml(ContentHandler contentHandler, public static void permissionStatusToXml(ContentHandler contentHandler,
PermissionStatus perm) throws SAXException { PermissionStatus perm) throws SAXException {
contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl()); contentHandler.startElement(
"", "", "PERMISSION_STATUS", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName()); XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName()); XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
fsPermissionToXml(contentHandler, perm.getPermission()); fsPermissionToXml(contentHandler, perm.getPermission());
@ -5044,13 +5050,13 @@ public abstract class FSEditLogOp {
public static void fsPermissionToXml(ContentHandler contentHandler, public static void fsPermissionToXml(ContentHandler contentHandler,
FsPermission mode) throws SAXException { FsPermission mode) throws SAXException {
XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort()) XMLUtils.addSaxString(contentHandler, "MODE",
.toString()); Short.toString(mode.toShort()));
} }
public static FsPermission fsPermissionFromXml(Stanza st) public static FsPermission fsPermissionFromXml(Stanza st)
throws InvalidXmlException { throws InvalidXmlException {
short mode = Short.valueOf(st.getValue("MODE")); short mode = Short.parseShort(st.getValue("MODE"));
return new FsPermission(mode); return new FsPermission(mode);
} }
@ -5059,7 +5065,8 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL); XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
} }
private static FsAction fsActionFromXml(Stanza st) throws InvalidXmlException { private static FsAction fsActionFromXml(Stanza st)
throws InvalidXmlException {
FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM")); FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
if (v == null) if (v == null)
throw new InvalidXmlException("Invalid value for FsAction"); throw new InvalidXmlException("Invalid value for FsAction");

View File

@ -173,7 +173,8 @@ public class RollingWindowManager {
* @param user the user that updated the metric * @param user the user that updated the metric
* @param delta the amount of change in the metric, e.g., +1 * @param delta the amount of change in the metric, e.g., +1
*/ */
public void recordMetric(long time, String command, String user, long delta) { public void recordMetric(long time, String command,
String user, long delta) {
RollingWindow window = getRollingWindow(command, user); RollingWindow window = getRollingWindow(command, user);
window.incAt(time, delta); window.incAt(time, delta);
} }
@ -208,7 +209,7 @@ public class RollingWindowManager {
} }
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
NameValuePair userEntry = reverse.pop(); NameValuePair userEntry = reverse.pop();
User user = new User(userEntry.name, Long.valueOf(userEntry.value)); User user = new User(userEntry.name, userEntry.value);
op.addUser(user); op.addUser(user);
} }
} }
@ -243,7 +244,8 @@ public class RollingWindowManager {
metricName, userName, windowSum); metricName, userName, windowSum);
topN.offer(new NameValuePair(userName, windowSum)); topN.offer(new NameValuePair(userName, windowSum));
} }
LOG.debug("topN users size for command {} is: {}", metricName, topN.size()); LOG.debug("topN users size for command {} is: {}",
metricName, topN.size());
return topN; return topN;
} }

View File

@ -72,9 +72,9 @@ import org.junit.Test;
/** /**
* Test for short circuit read functionality using {@link BlockReaderLocal}. * Test for short circuit read functionality using {@link BlockReaderLocal}.
* When a block is being read by a client is on the local datanode, instead of * When a block is being read by a client is on the local datanode, instead of
* using {@link DataTransferProtocol} and connect to datanode, the short circuit * using {@link DataTransferProtocol} and connect to datanode,
* read allows reading the file directly from the files on the local file * the short circuit read allows reading the file directly
* system. * from the files on the local file system.
*/ */
public class TestShortCircuitLocalRead { public class TestShortCircuitLocalRead {
private static TemporarySocketDirectory sockDir; private static TemporarySocketDirectory sockDir;
@ -195,7 +195,8 @@ public class TestShortCircuitLocalRead {
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name); HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset); ByteBuffer actual =
ByteBuffer.allocateDirect(expected.length - readOffset);
IOUtils.skipFully(stm, readOffset); IOUtils.skipFully(stm, readOffset);
@ -230,7 +231,8 @@ public class TestShortCircuitLocalRead {
public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size, public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser, int readOffset, String shortCircuitUser, String readingUser,
boolean legacyShortCircuitFails) throws IOException, InterruptedException { boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset, doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
shortCircuitUser, readingUser, legacyShortCircuitFails); shortCircuitUser, readingUser, legacyShortCircuitFails);
} }
@ -247,7 +249,8 @@ public class TestShortCircuitLocalRead {
*/ */
public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size, public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser, int readOffset, String shortCircuitUser, String readingUser,
boolean legacyShortCircuitFails) throws IOException, InterruptedException { boolean legacyShortCircuitFails)
throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
@ -262,7 +265,8 @@ public class TestShortCircuitLocalRead {
if (shortCircuitUser != null) { if (shortCircuitUser != null) {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
shortCircuitUser); shortCircuitUser);
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true); conf.setBoolean(
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
} }
if (simulatedStorage) { if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
@ -324,7 +328,8 @@ public class TestShortCircuitLocalRead {
*/ */
@Test(timeout=60000) @Test(timeout=60000)
public void testLocalReadFallback() throws Exception { public void testLocalReadFallback() throws Exception {
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true); doTestShortCircuitReadLegacy(
true, 13, 0, getCurrentUser(), "notallowed", true);
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -366,8 +371,9 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock()); ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken(); Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
ClientDatanodeProtocol proxy = ClientDatanodeProtocol proxy =
DFSUtilClient.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false); DFSUtilClient.createClientDatanodeProtocolProxy(
dnInfo, conf, 60000, false);
try { try {
proxy.getBlockLocalPathInfo(blk, token); proxy.getBlockLocalPathInfo(blk, token);
Assert.fail("The call should have failed as this user " Assert.fail("The call should have failed as this user "
@ -387,7 +393,8 @@ public class TestShortCircuitLocalRead {
int size = blockSize; int size = blockSize;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false); conf.setBoolean(
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath()); "testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath());
@ -434,7 +441,8 @@ public class TestShortCircuitLocalRead {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false); conf.setBoolean(
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath()); "testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath());
@ -523,8 +531,8 @@ public class TestShortCircuitLocalRead {
System.out.println("Usage: test shortcircuit checksum threadCount"); System.out.println("Usage: test shortcircuit checksum threadCount");
System.exit(1); System.exit(1);
} }
boolean shortcircuit = Boolean.valueOf(args[0]); boolean shortcircuit = Boolean.parseBoolean(args[0]);
boolean checksum = Boolean.valueOf(args[1]); boolean checksum = Boolean.parseBoolean(args[1]);
int threadCount = Integer.parseInt(args[2]); int threadCount = Integer.parseInt(args[2]);
// Setup create a file // Setup create a file
@ -535,7 +543,8 @@ public class TestShortCircuitLocalRead {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
checksum); checksum);
// Override fileSize and DATA_TO_WRITE to much larger values for benchmark test // Override fileSize and DATA_TO_WRITE to
// much larger values for benchmark test
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize); final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
@ -557,7 +566,8 @@ public class TestShortCircuitLocalRead {
for (int i = 0; i < iteration; i++) { for (int i = 0; i < iteration; i++) {
try { try {
String user = getCurrentUser(); String user = getCurrentUser();
checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true); checkFileContent(
fs.getUri(), file1, dataToWrite, 0, user, conf, true);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -590,11 +600,13 @@ public class TestShortCircuitLocalRead {
* through RemoteBlockReader * through RemoteBlockReader
* @throws IOException * @throws IOException
*/ */
public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum, public void doTestShortCircuitReadWithRemoteBlockReader(
int size, String shortCircuitUser, int readOffset, boolean ignoreChecksum, int size, String shortCircuitUser,
boolean shortCircuitFails) throws IOException, InterruptedException { int readOffset, boolean shortCircuitFails)
throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true); conf.setBoolean(
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
@ -603,7 +615,8 @@ public class TestShortCircuitLocalRead {
// check that / exists // check that / exists
Path path = new Path("/"); Path path = new Path("/");
URI uri = cluster.getURI(); URI uri = cluster.getURI();
assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory()); assertTrue(
"/ should be a directory", fs.getFileStatus(path).isDirectory());
byte[] fileData = AppendTestUtil.randomBytes(seed, size); byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = new Path("filelocal.dat"); Path file1 = new Path("filelocal.dat");
@ -615,10 +628,12 @@ public class TestShortCircuitLocalRead {
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
conf, shortCircuitFails); conf, shortCircuitFails);
//RemoteBlockReader have unsupported method read(ByteBuffer bf) //RemoteBlockReader have unsupported method read(ByteBuffer bf)
assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error", assertTrue(
checkUnsupportedMethod(fs, file1, fileData, readOffset)); "RemoteBlockReader unsupported method read(ByteBuffer bf) error",
checkUnsupportedMethod(fs, file1, fileData, readOffset));
} catch(IOException e) { } catch(IOException e) {
throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e); throw new IOException(
"doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
} catch(InterruptedException inEx) { } catch(InterruptedException inEx) {
throw inEx; throw inEx;
} finally { } finally {
@ -630,7 +645,8 @@ public class TestShortCircuitLocalRead {
private boolean checkUnsupportedMethod(FileSystem fs, Path file, private boolean checkUnsupportedMethod(FileSystem fs, Path file,
byte[] expected, int readOffset) throws IOException { byte[] expected, int readOffset) throws IOException {
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file); HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset); ByteBuffer actual =
ByteBuffer.allocateDirect(expected.length - readOffset);
IOUtils.skipFully(stm, readOffset); IOUtils.skipFully(stm, readOffset);
try { try {
stm.read(actual); stm.read(actual);

View File

@ -245,6 +245,63 @@ import org.junit.Test;
verify(fs).delete(stagingJobPath, true); verify(fs).delete(stagingJobPath, true);
} }
@Test
public void testByPreserveFailedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// Failed task's staging files should be kept
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test
public void testPreservePatternMatchedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// The staging files that are matched to the pattern
// should not be deleted
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster { private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator; ContainerAllocator allocator;
boolean testIsLastAMRetry = false; boolean testIsLastAMRetry = false;