Revert "HDFS-9686. Remove useless boxing/unboxing code. Contributed by Kousuke Saruta."

This reverts commit e4c01b8b1c.
This commit is contained in:
Akira Ajisaka 2016-02-10 23:45:15 +09:00
parent 179b36efb5
commit 15ef3a85d9
6 changed files with 53 additions and 145 deletions

View File

@ -36,9 +36,7 @@ import java.text.MessageFormat;
* and uses its lifecycle to start and stop the server. * and uses its lifecycle to start and stop the server.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class ServerWebApp public abstract class ServerWebApp extends Server implements ServletContextListener {
extends Server
implements ServletContextListener {
private static final String HOME_DIR = ".home.dir"; private static final String HOME_DIR = ".home.dir";
private static final String CONFIG_DIR = ".config.dir"; private static final String CONFIG_DIR = ".config.dir";
@ -63,8 +61,8 @@ public abstract class ServerWebApp
/** /**
* Constructor for testing purposes. * Constructor for testing purposes.
*/ */
protected ServerWebApp(String name, String homeDir, String configDir, protected ServerWebApp(String name, String homeDir, String configDir, String logDir, String tempDir,
String logDir, String tempDir, Configuration config) { Configuration config) {
super(name, homeDir, configDir, logDir, tempDir, config); super(name, homeDir, configDir, logDir, tempDir, config);
} }
@ -122,8 +120,7 @@ public abstract class ServerWebApp
String sysProp = name + HOME_DIR; String sysProp = name + HOME_DIR;
homeDir = System.getProperty(sysProp); homeDir = System.getProperty(sysProp);
if (homeDir == null) { if (homeDir == null) {
throw new IllegalArgumentException(MessageFormat.format( throw new IllegalArgumentException(MessageFormat.format("System property [{0}] not defined", sysProp));
"System property [{0}] not defined", sysProp));
} }
} }
return homeDir; return homeDir;
@ -163,8 +160,7 @@ public abstract class ServerWebApp
} }
/** /**
* Resolves the host and port InetSocketAddress the * Resolves the host and port InetSocketAddress the web server is listening to.
* web server is listening to.
* <p> * <p>
* This implementation looks for the following 2 properties: * This implementation looks for the following 2 properties:
* <ul> * <ul>
@ -172,10 +168,8 @@ public abstract class ServerWebApp
* <li>#SERVER_NAME#.http.port</li> * <li>#SERVER_NAME#.http.port</li>
* </ul> * </ul>
* *
* @return the host and port InetSocketAddress the * @return the host and port InetSocketAddress the web server is listening to.
* web server is listening to. * @throws ServerException thrown if any of the above 2 properties is not defined.
* @throws ServerException thrown
* if any of the above 2 properties is not defined.
*/ */
protected InetSocketAddress resolveAuthority() throws ServerException { protected InetSocketAddress resolveAuthority() throws ServerException {
String hostnameKey = getName() + HTTP_HOSTNAME; String hostnameKey = getName() + HTTP_HOSTNAME;
@ -239,7 +233,6 @@ public abstract class ServerWebApp
* *
*/ */
public boolean isSslEnabled() { public boolean isSslEnabled() {
return Boolean.parseBoolean( return Boolean.valueOf(System.getProperty(getName() + SSL_ENABLED, "false"));
System.getProperty(getName() + SSL_ENABLED, "false"));
} }
} }

View File

@ -1045,9 +1045,6 @@ Release 2.8.0 - UNRELEASED
HDFS-9726. Refactor IBR code to a new class. (szetszwo) HDFS-9726. Refactor IBR code to a new class. (szetszwo)
HDFS-9686. Remove useless boxing/unboxing code.
(Kousuke Saruta via aajisaka)
BUG FIXES BUG FIXES
HDFS-8091: ACLStatus and XAttributes should be presented to HDFS-8091: ACLStatus and XAttributes should be presented to

View File

@ -416,9 +416,7 @@ public abstract class FSEditLogOp {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static abstract class AddCloseOp static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
extends FSEditLogOp
implements BlockListUpdatingOp {
int length; int length;
long inodeId; long inodeId;
String path; String path;
@ -639,8 +637,7 @@ public abstract class FSEditLogOp {
NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) { NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
this.storagePolicyId = FSImageSerialization.readByte(in); this.storagePolicyId = FSImageSerialization.readByte(in);
} else { } else {
this.storagePolicyId = this.storagePolicyId = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
} }
// read clientId and callId // read clientId and callId
readRpcIds(in, logVersion); readRpcIds(in, logVersion);
@ -720,7 +717,7 @@ public abstract class FSEditLogOp {
Long.toString(inodeId)); Long.toString(inodeId));
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "REPLICATION", XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(replication)); Short.valueOf(replication).toString());
XMLUtils.addSaxString(contentHandler, "MTIME", XMLUtils.addSaxString(contentHandler, "MTIME",
Long.toString(mtime)); Long.toString(mtime));
XMLUtils.addSaxString(contentHandler, "ATIME", XMLUtils.addSaxString(contentHandler, "ATIME",
@ -748,7 +745,7 @@ public abstract class FSEditLogOp {
this.length = Integer.parseInt(st.getValue("LENGTH")); this.length = Integer.parseInt(st.getValue("LENGTH"));
this.inodeId = Long.parseLong(st.getValue("INODEID")); this.inodeId = Long.parseLong(st.getValue("INODEID"));
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.replication = Short.parseShort(st.getValue("REPLICATION")); this.replication = Short.valueOf(st.getValue("REPLICATION"));
this.mtime = Long.parseLong(st.getValue("MTIME")); this.mtime = Long.parseLong(st.getValue("MTIME"));
this.atime = Long.parseLong(st.getValue("ATIME")); this.atime = Long.parseLong(st.getValue("ATIME"));
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE")); this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
@ -1189,12 +1186,12 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "REPLICATION", XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(replication)); Short.valueOf(replication).toString());
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.replication = Short.parseShort(st.getValue("REPLICATION")); this.replication = Short.valueOf(st.getValue("REPLICATION"));
} }
} }
@ -1982,13 +1979,13 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "SRC", src); XMLUtils.addSaxString(contentHandler, "SRC", src);
XMLUtils.addSaxString(contentHandler, "MODE", XMLUtils.addSaxString(contentHandler, "MODE",
Short.toString(permissions.toShort())); Short.valueOf(permissions.toShort()).toString());
} }
@Override void fromXml(Stanza st) throws InvalidXmlException { @Override void fromXml(Stanza st) throws InvalidXmlException {
this.src = st.getValue("SRC"); this.src = st.getValue("SRC");
this.permissions = new FsPermission( this.permissions = new FsPermission(
Short.parseShort(st.getValue("MODE"))); Short.valueOf(st.getValue("MODE")));
} }
} }
@ -4472,13 +4469,13 @@ public abstract class FSEditLogOp {
protected void toXml(ContentHandler contentHandler) throws SAXException { protected void toXml(ContentHandler contentHandler) throws SAXException {
XMLUtils.addSaxString(contentHandler, "PATH", path); XMLUtils.addSaxString(contentHandler, "PATH", path);
XMLUtils.addSaxString(contentHandler, "POLICYID", XMLUtils.addSaxString(contentHandler, "POLICYID",
Byte.toString(policyId)); Byte.valueOf(policyId).toString());
} }
@Override @Override
void fromXml(Stanza st) throws InvalidXmlException { void fromXml(Stanza st) throws InvalidXmlException {
this.path = st.getValue("PATH"); this.path = st.getValue("PATH");
this.policyId = Byte.parseByte(st.getValue("POLICYID")); this.policyId = Byte.valueOf(st.getValue("POLICYID"));
} }
} }
@ -4955,8 +4952,7 @@ public abstract class FSEditLogOp {
public static void delegationTokenToXml(ContentHandler contentHandler, public static void delegationTokenToXml(ContentHandler contentHandler,
DelegationTokenIdentifier token) throws SAXException { DelegationTokenIdentifier token) throws SAXException {
contentHandler.startElement( contentHandler.startElement("", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
"", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString()); XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString());
XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER", XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER",
Integer.toString(token.getSequenceNumber())); Integer.toString(token.getSequenceNumber()));
@ -5002,8 +4998,7 @@ public abstract class FSEditLogOp {
public static void delegationKeyToXml(ContentHandler contentHandler, public static void delegationKeyToXml(ContentHandler contentHandler,
DelegationKey key) throws SAXException { DelegationKey key) throws SAXException {
contentHandler.startElement( contentHandler.startElement("", "", "DELEGATION_KEY", new AttributesImpl());
"", "", "DELEGATION_KEY", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "KEY_ID", XMLUtils.addSaxString(contentHandler, "KEY_ID",
Integer.toString(key.getKeyId())); Integer.toString(key.getKeyId()));
XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE", XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE",
@ -5031,8 +5026,7 @@ public abstract class FSEditLogOp {
public static void permissionStatusToXml(ContentHandler contentHandler, public static void permissionStatusToXml(ContentHandler contentHandler,
PermissionStatus perm) throws SAXException { PermissionStatus perm) throws SAXException {
contentHandler.startElement( contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl());
"", "", "PERMISSION_STATUS", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName()); XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName()); XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
fsPermissionToXml(contentHandler, perm.getPermission()); fsPermissionToXml(contentHandler, perm.getPermission());
@ -5050,13 +5044,13 @@ public abstract class FSEditLogOp {
public static void fsPermissionToXml(ContentHandler contentHandler, public static void fsPermissionToXml(ContentHandler contentHandler,
FsPermission mode) throws SAXException { FsPermission mode) throws SAXException {
XMLUtils.addSaxString(contentHandler, "MODE", XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort())
Short.toString(mode.toShort())); .toString());
} }
public static FsPermission fsPermissionFromXml(Stanza st) public static FsPermission fsPermissionFromXml(Stanza st)
throws InvalidXmlException { throws InvalidXmlException {
short mode = Short.parseShort(st.getValue("MODE")); short mode = Short.valueOf(st.getValue("MODE"));
return new FsPermission(mode); return new FsPermission(mode);
} }
@ -5065,8 +5059,7 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL); XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
} }
private static FsAction fsActionFromXml(Stanza st) private static FsAction fsActionFromXml(Stanza st) throws InvalidXmlException {
throws InvalidXmlException {
FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM")); FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
if (v == null) if (v == null)
throw new InvalidXmlException("Invalid value for FsAction"); throw new InvalidXmlException("Invalid value for FsAction");

View File

@ -173,8 +173,7 @@ public class RollingWindowManager {
* @param user the user that updated the metric * @param user the user that updated the metric
* @param delta the amount of change in the metric, e.g., +1 * @param delta the amount of change in the metric, e.g., +1
*/ */
public void recordMetric(long time, String command, public void recordMetric(long time, String command, String user, long delta) {
String user, long delta) {
RollingWindow window = getRollingWindow(command, user); RollingWindow window = getRollingWindow(command, user);
window.incAt(time, delta); window.incAt(time, delta);
} }
@ -209,7 +208,7 @@ public class RollingWindowManager {
} }
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
NameValuePair userEntry = reverse.pop(); NameValuePair userEntry = reverse.pop();
User user = new User(userEntry.name, userEntry.value); User user = new User(userEntry.name, Long.valueOf(userEntry.value));
op.addUser(user); op.addUser(user);
} }
} }
@ -244,8 +243,7 @@ public class RollingWindowManager {
metricName, userName, windowSum); metricName, userName, windowSum);
topN.offer(new NameValuePair(userName, windowSum)); topN.offer(new NameValuePair(userName, windowSum));
} }
LOG.debug("topN users size for command {} is: {}", LOG.debug("topN users size for command {} is: {}", metricName, topN.size());
metricName, topN.size());
return topN; return topN;
} }

View File

@ -72,9 +72,9 @@ import org.junit.Test;
/** /**
* Test for short circuit read functionality using {@link BlockReaderLocal}. * Test for short circuit read functionality using {@link BlockReaderLocal}.
* When a block is being read by a client is on the local datanode, instead of * When a block is being read by a client is on the local datanode, instead of
* using {@link DataTransferProtocol} and connect to datanode, * using {@link DataTransferProtocol} and connect to datanode, the short circuit
* the short circuit read allows reading the file directly * read allows reading the file directly from the files on the local file
* from the files on the local file system. * system.
*/ */
public class TestShortCircuitLocalRead { public class TestShortCircuitLocalRead {
private static TemporarySocketDirectory sockDir; private static TemporarySocketDirectory sockDir;
@ -195,8 +195,7 @@ public class TestShortCircuitLocalRead {
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name); HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
ByteBuffer actual = ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
ByteBuffer.allocateDirect(expected.length - readOffset);
IOUtils.skipFully(stm, readOffset); IOUtils.skipFully(stm, readOffset);
@ -231,8 +230,7 @@ public class TestShortCircuitLocalRead {
public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size, public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser, int readOffset, String shortCircuitUser, String readingUser,
boolean legacyShortCircuitFails) boolean legacyShortCircuitFails) throws IOException, InterruptedException {
throws IOException, InterruptedException {
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset, doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
shortCircuitUser, readingUser, legacyShortCircuitFails); shortCircuitUser, readingUser, legacyShortCircuitFails);
} }
@ -249,8 +247,7 @@ public class TestShortCircuitLocalRead {
*/ */
public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size, public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
int readOffset, String shortCircuitUser, String readingUser, int readOffset, String shortCircuitUser, String readingUser,
boolean legacyShortCircuitFails) boolean legacyShortCircuitFails) throws IOException, InterruptedException {
throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
@ -265,8 +262,7 @@ public class TestShortCircuitLocalRead {
if (shortCircuitUser != null) { if (shortCircuitUser != null) {
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
shortCircuitUser); shortCircuitUser);
conf.setBoolean( conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
} }
if (simulatedStorage) { if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
@ -328,8 +324,7 @@ public class TestShortCircuitLocalRead {
*/ */
@Test(timeout=60000) @Test(timeout=60000)
public void testLocalReadFallback() throws Exception { public void testLocalReadFallback() throws Exception {
doTestShortCircuitReadLegacy( doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
true, 13, 0, getCurrentUser(), "notallowed", true);
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -371,9 +366,8 @@ public class TestShortCircuitLocalRead {
ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock()); ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken(); Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
ClientDatanodeProtocol proxy = ClientDatanodeProtocol proxy =
DFSUtilClient.createClientDatanodeProtocolProxy( DFSUtilClient.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
dnInfo, conf, 60000, false);
try { try {
proxy.getBlockLocalPathInfo(blk, token); proxy.getBlockLocalPathInfo(blk, token);
Assert.fail("The call should have failed as this user " Assert.fail("The call should have failed as this user "
@ -393,8 +387,7 @@ public class TestShortCircuitLocalRead {
int size = blockSize; int size = blockSize;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean( conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath()); "testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath());
@ -441,8 +434,7 @@ public class TestShortCircuitLocalRead {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
conf.setBoolean( conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(), new File(sockDir.getDir(),
"testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath()); "testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath());
@ -531,8 +523,8 @@ public class TestShortCircuitLocalRead {
System.out.println("Usage: test shortcircuit checksum threadCount"); System.out.println("Usage: test shortcircuit checksum threadCount");
System.exit(1); System.exit(1);
} }
boolean shortcircuit = Boolean.parseBoolean(args[0]); boolean shortcircuit = Boolean.valueOf(args[0]);
boolean checksum = Boolean.parseBoolean(args[1]); boolean checksum = Boolean.valueOf(args[1]);
int threadCount = Integer.parseInt(args[2]); int threadCount = Integer.parseInt(args[2]);
// Setup create a file // Setup create a file
@ -543,8 +535,7 @@ public class TestShortCircuitLocalRead {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
checksum); checksum);
// Override fileSize and DATA_TO_WRITE to // Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
// much larger values for benchmark test
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize); final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
@ -566,8 +557,7 @@ public class TestShortCircuitLocalRead {
for (int i = 0; i < iteration; i++) { for (int i = 0; i < iteration; i++) {
try { try {
String user = getCurrentUser(); String user = getCurrentUser();
checkFileContent( checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
fs.getUri(), file1, dataToWrite, 0, user, conf, true);
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -600,13 +590,11 @@ public class TestShortCircuitLocalRead {
* through RemoteBlockReader * through RemoteBlockReader
* @throws IOException * @throws IOException
*/ */
public void doTestShortCircuitReadWithRemoteBlockReader( public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum,
boolean ignoreChecksum, int size, String shortCircuitUser, int size, String shortCircuitUser, int readOffset,
int readOffset, boolean shortCircuitFails) boolean shortCircuitFails) throws IOException, InterruptedException {
throws IOException, InterruptedException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean( conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true); conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
@ -615,8 +603,7 @@ public class TestShortCircuitLocalRead {
// check that / exists // check that / exists
Path path = new Path("/"); Path path = new Path("/");
URI uri = cluster.getURI(); URI uri = cluster.getURI();
assertTrue( assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory());
"/ should be a directory", fs.getFileStatus(path).isDirectory());
byte[] fileData = AppendTestUtil.randomBytes(seed, size); byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = new Path("filelocal.dat"); Path file1 = new Path("filelocal.dat");
@ -628,12 +615,10 @@ public class TestShortCircuitLocalRead {
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser, checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
conf, shortCircuitFails); conf, shortCircuitFails);
//RemoteBlockReader have unsupported method read(ByteBuffer bf) //RemoteBlockReader have unsupported method read(ByteBuffer bf)
assertTrue( assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
"RemoteBlockReader unsupported method read(ByteBuffer bf) error", checkUnsupportedMethod(fs, file1, fileData, readOffset));
checkUnsupportedMethod(fs, file1, fileData, readOffset));
} catch(IOException e) { } catch(IOException e) {
throw new IOException( throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
"doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
} catch(InterruptedException inEx) { } catch(InterruptedException inEx) {
throw inEx; throw inEx;
} finally { } finally {
@ -645,8 +630,7 @@ public class TestShortCircuitLocalRead {
private boolean checkUnsupportedMethod(FileSystem fs, Path file, private boolean checkUnsupportedMethod(FileSystem fs, Path file,
byte[] expected, int readOffset) throws IOException { byte[] expected, int readOffset) throws IOException {
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file); HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
ByteBuffer actual = ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
ByteBuffer.allocateDirect(expected.length - readOffset);
IOUtils.skipFully(stm, readOffset); IOUtils.skipFully(stm, readOffset);
try { try {
stm.read(actual); stm.read(actual);

View File

@ -245,63 +245,6 @@ import org.junit.Test;
verify(fs).delete(stagingJobPath, true); verify(fs).delete(stagingJobPath, true);
} }
@Test
public void testByPreserveFailedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// Failed task's staging files should be kept
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
@Test
public void testPreservePatternMatchedStaging() throws IOException {
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
// The staging files that are matched to the pattern
// should not be deleted
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
fs = mock(FileSystem.class);
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
//Staging Dir exists
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
when(fs.exists(stagingDir)).thenReturn(true);
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
0);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
appMaster.start();
appMaster.shutDownJob();
//test whether notifyIsLastAMRetry called
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs, times(0)).delete(stagingJobPath, true);
}
private class TestMRApp extends MRAppMaster { private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator; ContainerAllocator allocator;
boolean testIsLastAMRetry = false; boolean testIsLastAMRetry = false;