HDFS-9686. Remove useless boxing/unboxing code. Contributed by Kousuke Saruta.
This commit is contained in:
parent
cfa8513890
commit
fe124da5ff
|
@ -36,7 +36,9 @@ import java.text.MessageFormat;
|
||||||
* and uses its lifecycle to start and stop the server.
|
* and uses its lifecycle to start and stop the server.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class ServerWebApp extends Server implements ServletContextListener {
|
public abstract class ServerWebApp
|
||||||
|
extends Server
|
||||||
|
implements ServletContextListener {
|
||||||
|
|
||||||
private static final String HOME_DIR = ".home.dir";
|
private static final String HOME_DIR = ".home.dir";
|
||||||
private static final String CONFIG_DIR = ".config.dir";
|
private static final String CONFIG_DIR = ".config.dir";
|
||||||
|
@ -61,8 +63,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
|
||||||
/**
|
/**
|
||||||
* Constructor for testing purposes.
|
* Constructor for testing purposes.
|
||||||
*/
|
*/
|
||||||
protected ServerWebApp(String name, String homeDir, String configDir, String logDir, String tempDir,
|
protected ServerWebApp(String name, String homeDir, String configDir,
|
||||||
Configuration config) {
|
String logDir, String tempDir, Configuration config) {
|
||||||
super(name, homeDir, configDir, logDir, tempDir, config);
|
super(name, homeDir, configDir, logDir, tempDir, config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,7 +122,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
|
||||||
String sysProp = name + HOME_DIR;
|
String sysProp = name + HOME_DIR;
|
||||||
homeDir = System.getProperty(sysProp);
|
homeDir = System.getProperty(sysProp);
|
||||||
if (homeDir == null) {
|
if (homeDir == null) {
|
||||||
throw new IllegalArgumentException(MessageFormat.format("System property [{0}] not defined", sysProp));
|
throw new IllegalArgumentException(MessageFormat.format(
|
||||||
|
"System property [{0}] not defined", sysProp));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return homeDir;
|
return homeDir;
|
||||||
|
@ -160,7 +163,8 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Resolves the host and port InetSocketAddress the web server is listening to.
|
* Resolves the host and port InetSocketAddress the
|
||||||
|
* web server is listening to.
|
||||||
* <p>
|
* <p>
|
||||||
* This implementation looks for the following 2 properties:
|
* This implementation looks for the following 2 properties:
|
||||||
* <ul>
|
* <ul>
|
||||||
|
@ -168,8 +172,10 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
|
||||||
* <li>#SERVER_NAME#.http.port</li>
|
* <li>#SERVER_NAME#.http.port</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* @return the host and port InetSocketAddress the web server is listening to.
|
* @return the host and port InetSocketAddress the
|
||||||
* @throws ServerException thrown if any of the above 2 properties is not defined.
|
* web server is listening to.
|
||||||
|
* @throws ServerException thrown
|
||||||
|
* if any of the above 2 properties is not defined.
|
||||||
*/
|
*/
|
||||||
protected InetSocketAddress resolveAuthority() throws ServerException {
|
protected InetSocketAddress resolveAuthority() throws ServerException {
|
||||||
String hostnameKey = getName() + HTTP_HOSTNAME;
|
String hostnameKey = getName() + HTTP_HOSTNAME;
|
||||||
|
@ -233,6 +239,7 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public boolean isSslEnabled() {
|
public boolean isSslEnabled() {
|
||||||
return Boolean.valueOf(System.getProperty(getName() + SSL_ENABLED, "false"));
|
return Boolean.parseBoolean(
|
||||||
|
System.getProperty(getName() + SSL_ENABLED, "false"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1954,6 +1954,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-9726. Refactor IBR code to a new class. (szetszwo)
|
HDFS-9726. Refactor IBR code to a new class. (szetszwo)
|
||||||
|
|
||||||
|
HDFS-9686. Remove useless boxing/unboxing code.
|
||||||
|
(Kousuke Saruta via aajisaka)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs.
|
||||||
|
|
|
@ -414,7 +414,9 @@ public abstract class FSEditLogOp {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp {
|
static abstract class AddCloseOp
|
||||||
|
extends FSEditLogOp
|
||||||
|
implements BlockListUpdatingOp {
|
||||||
int length;
|
int length;
|
||||||
long inodeId;
|
long inodeId;
|
||||||
String path;
|
String path;
|
||||||
|
@ -635,7 +637,8 @@ public abstract class FSEditLogOp {
|
||||||
NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
|
NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
|
||||||
this.storagePolicyId = FSImageSerialization.readByte(in);
|
this.storagePolicyId = FSImageSerialization.readByte(in);
|
||||||
} else {
|
} else {
|
||||||
this.storagePolicyId = HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
this.storagePolicyId =
|
||||||
|
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
|
||||||
}
|
}
|
||||||
// read clientId and callId
|
// read clientId and callId
|
||||||
readRpcIds(in, logVersion);
|
readRpcIds(in, logVersion);
|
||||||
|
@ -715,7 +718,7 @@ public abstract class FSEditLogOp {
|
||||||
Long.toString(inodeId));
|
Long.toString(inodeId));
|
||||||
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
||||||
XMLUtils.addSaxString(contentHandler, "REPLICATION",
|
XMLUtils.addSaxString(contentHandler, "REPLICATION",
|
||||||
Short.valueOf(replication).toString());
|
Short.toString(replication));
|
||||||
XMLUtils.addSaxString(contentHandler, "MTIME",
|
XMLUtils.addSaxString(contentHandler, "MTIME",
|
||||||
Long.toString(mtime));
|
Long.toString(mtime));
|
||||||
XMLUtils.addSaxString(contentHandler, "ATIME",
|
XMLUtils.addSaxString(contentHandler, "ATIME",
|
||||||
|
@ -743,7 +746,7 @@ public abstract class FSEditLogOp {
|
||||||
this.length = Integer.parseInt(st.getValue("LENGTH"));
|
this.length = Integer.parseInt(st.getValue("LENGTH"));
|
||||||
this.inodeId = Long.parseLong(st.getValue("INODEID"));
|
this.inodeId = Long.parseLong(st.getValue("INODEID"));
|
||||||
this.path = st.getValue("PATH");
|
this.path = st.getValue("PATH");
|
||||||
this.replication = Short.valueOf(st.getValue("REPLICATION"));
|
this.replication = Short.parseShort(st.getValue("REPLICATION"));
|
||||||
this.mtime = Long.parseLong(st.getValue("MTIME"));
|
this.mtime = Long.parseLong(st.getValue("MTIME"));
|
||||||
this.atime = Long.parseLong(st.getValue("ATIME"));
|
this.atime = Long.parseLong(st.getValue("ATIME"));
|
||||||
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
|
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
|
||||||
|
@ -1184,12 +1187,12 @@ public abstract class FSEditLogOp {
|
||||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||||
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
||||||
XMLUtils.addSaxString(contentHandler, "REPLICATION",
|
XMLUtils.addSaxString(contentHandler, "REPLICATION",
|
||||||
Short.valueOf(replication).toString());
|
Short.toString(replication));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
||||||
this.path = st.getValue("PATH");
|
this.path = st.getValue("PATH");
|
||||||
this.replication = Short.valueOf(st.getValue("REPLICATION"));
|
this.replication = Short.parseShort(st.getValue("REPLICATION"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1977,13 +1980,13 @@ public abstract class FSEditLogOp {
|
||||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||||
XMLUtils.addSaxString(contentHandler, "SRC", src);
|
XMLUtils.addSaxString(contentHandler, "SRC", src);
|
||||||
XMLUtils.addSaxString(contentHandler, "MODE",
|
XMLUtils.addSaxString(contentHandler, "MODE",
|
||||||
Short.valueOf(permissions.toShort()).toString());
|
Short.toString(permissions.toShort()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
@Override void fromXml(Stanza st) throws InvalidXmlException {
|
||||||
this.src = st.getValue("SRC");
|
this.src = st.getValue("SRC");
|
||||||
this.permissions = new FsPermission(
|
this.permissions = new FsPermission(
|
||||||
Short.valueOf(st.getValue("MODE")));
|
Short.parseShort(st.getValue("MODE")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4467,13 +4470,13 @@ public abstract class FSEditLogOp {
|
||||||
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
protected void toXml(ContentHandler contentHandler) throws SAXException {
|
||||||
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
XMLUtils.addSaxString(contentHandler, "PATH", path);
|
||||||
XMLUtils.addSaxString(contentHandler, "POLICYID",
|
XMLUtils.addSaxString(contentHandler, "POLICYID",
|
||||||
Byte.valueOf(policyId).toString());
|
Byte.toString(policyId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void fromXml(Stanza st) throws InvalidXmlException {
|
void fromXml(Stanza st) throws InvalidXmlException {
|
||||||
this.path = st.getValue("PATH");
|
this.path = st.getValue("PATH");
|
||||||
this.policyId = Byte.valueOf(st.getValue("POLICYID"));
|
this.policyId = Byte.parseByte(st.getValue("POLICYID"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4950,7 +4953,8 @@ public abstract class FSEditLogOp {
|
||||||
|
|
||||||
public static void delegationTokenToXml(ContentHandler contentHandler,
|
public static void delegationTokenToXml(ContentHandler contentHandler,
|
||||||
DelegationTokenIdentifier token) throws SAXException {
|
DelegationTokenIdentifier token) throws SAXException {
|
||||||
contentHandler.startElement("", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
|
contentHandler.startElement(
|
||||||
|
"", "", "DELEGATION_TOKEN_IDENTIFIER", new AttributesImpl());
|
||||||
XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString());
|
XMLUtils.addSaxString(contentHandler, "KIND", token.getKind().toString());
|
||||||
XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER",
|
XMLUtils.addSaxString(contentHandler, "SEQUENCE_NUMBER",
|
||||||
Integer.toString(token.getSequenceNumber()));
|
Integer.toString(token.getSequenceNumber()));
|
||||||
|
@ -4996,7 +5000,8 @@ public abstract class FSEditLogOp {
|
||||||
|
|
||||||
public static void delegationKeyToXml(ContentHandler contentHandler,
|
public static void delegationKeyToXml(ContentHandler contentHandler,
|
||||||
DelegationKey key) throws SAXException {
|
DelegationKey key) throws SAXException {
|
||||||
contentHandler.startElement("", "", "DELEGATION_KEY", new AttributesImpl());
|
contentHandler.startElement(
|
||||||
|
"", "", "DELEGATION_KEY", new AttributesImpl());
|
||||||
XMLUtils.addSaxString(contentHandler, "KEY_ID",
|
XMLUtils.addSaxString(contentHandler, "KEY_ID",
|
||||||
Integer.toString(key.getKeyId()));
|
Integer.toString(key.getKeyId()));
|
||||||
XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE",
|
XMLUtils.addSaxString(contentHandler, "EXPIRY_DATE",
|
||||||
|
@ -5024,7 +5029,8 @@ public abstract class FSEditLogOp {
|
||||||
|
|
||||||
public static void permissionStatusToXml(ContentHandler contentHandler,
|
public static void permissionStatusToXml(ContentHandler contentHandler,
|
||||||
PermissionStatus perm) throws SAXException {
|
PermissionStatus perm) throws SAXException {
|
||||||
contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl());
|
contentHandler.startElement(
|
||||||
|
"", "", "PERMISSION_STATUS", new AttributesImpl());
|
||||||
XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
|
XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
|
||||||
XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
|
XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
|
||||||
fsPermissionToXml(contentHandler, perm.getPermission());
|
fsPermissionToXml(contentHandler, perm.getPermission());
|
||||||
|
@ -5042,13 +5048,13 @@ public abstract class FSEditLogOp {
|
||||||
|
|
||||||
public static void fsPermissionToXml(ContentHandler contentHandler,
|
public static void fsPermissionToXml(ContentHandler contentHandler,
|
||||||
FsPermission mode) throws SAXException {
|
FsPermission mode) throws SAXException {
|
||||||
XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort())
|
XMLUtils.addSaxString(contentHandler, "MODE",
|
||||||
.toString());
|
Short.toString(mode.toShort()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FsPermission fsPermissionFromXml(Stanza st)
|
public static FsPermission fsPermissionFromXml(Stanza st)
|
||||||
throws InvalidXmlException {
|
throws InvalidXmlException {
|
||||||
short mode = Short.valueOf(st.getValue("MODE"));
|
short mode = Short.parseShort(st.getValue("MODE"));
|
||||||
return new FsPermission(mode);
|
return new FsPermission(mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5057,7 +5063,8 @@ public abstract class FSEditLogOp {
|
||||||
XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
|
XMLUtils.addSaxString(contentHandler, "PERM", v.SYMBOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static FsAction fsActionFromXml(Stanza st) throws InvalidXmlException {
|
private static FsAction fsActionFromXml(Stanza st)
|
||||||
|
throws InvalidXmlException {
|
||||||
FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
|
FsAction v = FSACTION_SYMBOL_MAP.get(st.getValue("PERM"));
|
||||||
if (v == null)
|
if (v == null)
|
||||||
throw new InvalidXmlException("Invalid value for FsAction");
|
throw new InvalidXmlException("Invalid value for FsAction");
|
||||||
|
|
|
@ -173,7 +173,8 @@ public class RollingWindowManager {
|
||||||
* @param user the user that updated the metric
|
* @param user the user that updated the metric
|
||||||
* @param delta the amount of change in the metric, e.g., +1
|
* @param delta the amount of change in the metric, e.g., +1
|
||||||
*/
|
*/
|
||||||
public void recordMetric(long time, String command, String user, long delta) {
|
public void recordMetric(long time, String command,
|
||||||
|
String user, long delta) {
|
||||||
RollingWindow window = getRollingWindow(command, user);
|
RollingWindow window = getRollingWindow(command, user);
|
||||||
window.incAt(time, delta);
|
window.incAt(time, delta);
|
||||||
}
|
}
|
||||||
|
@ -208,7 +209,7 @@ public class RollingWindowManager {
|
||||||
}
|
}
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
NameValuePair userEntry = reverse.pop();
|
NameValuePair userEntry = reverse.pop();
|
||||||
User user = new User(userEntry.name, Long.valueOf(userEntry.value));
|
User user = new User(userEntry.name, userEntry.value);
|
||||||
op.addUser(user);
|
op.addUser(user);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,7 +244,8 @@ public class RollingWindowManager {
|
||||||
metricName, userName, windowSum);
|
metricName, userName, windowSum);
|
||||||
topN.offer(new NameValuePair(userName, windowSum));
|
topN.offer(new NameValuePair(userName, windowSum));
|
||||||
}
|
}
|
||||||
LOG.debug("topN users size for command {} is: {}", metricName, topN.size());
|
LOG.debug("topN users size for command {} is: {}",
|
||||||
|
metricName, topN.size());
|
||||||
return topN;
|
return topN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,8 @@ import java.util.concurrent.TimeUnit;
|
||||||
* For example, if the user wants to test reading 1024MB data with 10 clients,
|
* For example, if the user wants to test reading 1024MB data with 10 clients,
|
||||||
* he/she should firstly generate 1024MB data with 10 (or more) clients.
|
* he/she should firstly generate 1024MB data with 10 (or more) clients.
|
||||||
*/
|
*/
|
||||||
public class ErasureCodeBenchmarkThroughput extends Configured implements Tool {
|
public class ErasureCodeBenchmarkThroughput
|
||||||
|
extends Configured implements Tool {
|
||||||
|
|
||||||
private static final int BUFFER_SIZE_MB = 128;
|
private static final int BUFFER_SIZE_MB = 128;
|
||||||
private static final String DFS_TMP_DIR = System.getProperty(
|
private static final String DFS_TMP_DIR = System.getProperty(
|
||||||
|
@ -114,13 +115,15 @@ public class ErasureCodeBenchmarkThroughput extends Configured implements Tool {
|
||||||
System.out.println(msg);
|
System.out.println(msg);
|
||||||
}
|
}
|
||||||
System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
|
System.err.println("Usage: ErasureCodeBenchmarkThroughput " +
|
||||||
"<read|write|gen|clean> <size in MB> <ec|rep> [num clients] [stf|pos]\n"
|
"<read|write|gen|clean> <size in MB> " +
|
||||||
+ "Stateful and positional option is only available for read.");
|
"<ec|rep> [num clients] [stf|pos]\n" +
|
||||||
|
"Stateful and positional option is only available for read.");
|
||||||
System.exit(1);
|
System.exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Long> doBenchmark(boolean isRead, int dataSizeMB, int numClients,
|
private List<Long> doBenchmark(boolean isRead, int dataSizeMB,
|
||||||
boolean isEc, boolean statefulRead, boolean isGen) throws Exception {
|
int numClients, boolean isEc, boolean statefulRead, boolean isGen)
|
||||||
|
throws Exception {
|
||||||
CompletionService<Long> cs = new ExecutorCompletionService<Long>(
|
CompletionService<Long> cs = new ExecutorCompletionService<Long>(
|
||||||
Executors.newFixedThreadPool(numClients));
|
Executors.newFixedThreadPool(numClients));
|
||||||
for (int i = 0; i < numClients; i++) {
|
for (int i = 0; i < numClients; i++) {
|
||||||
|
@ -217,7 +220,7 @@ public class ErasureCodeBenchmarkThroughput extends Configured implements Tool {
|
||||||
printUsage("Unknown operation: " + args[0]);
|
printUsage("Unknown operation: " + args[0]);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
dataSizeMB = Integer.valueOf(args[1]);
|
dataSizeMB = Integer.parseInt(args[1]);
|
||||||
if (dataSizeMB <= 0) {
|
if (dataSizeMB <= 0) {
|
||||||
printUsage("Invalid data size: " + dataSizeMB);
|
printUsage("Invalid data size: " + dataSizeMB);
|
||||||
}
|
}
|
||||||
|
@ -233,7 +236,7 @@ public class ErasureCodeBenchmarkThroughput extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
if (args.length >= 4 && type != OpType.CLEAN) {
|
if (args.length >= 4 && type != OpType.CLEAN) {
|
||||||
try {
|
try {
|
||||||
numClients = Integer.valueOf(args[3]);
|
numClients = Integer.parseInt(args[3]);
|
||||||
if (numClients <= 0) {
|
if (numClients <= 0) {
|
||||||
printUsage("Invalid num of clients: " + numClients);
|
printUsage("Invalid num of clients: " + numClients);
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,9 +72,9 @@ import org.junit.Test;
|
||||||
/**
|
/**
|
||||||
* Test for short circuit read functionality using {@link BlockReaderLocal}.
|
* Test for short circuit read functionality using {@link BlockReaderLocal}.
|
||||||
* When a block is being read by a client is on the local datanode, instead of
|
* When a block is being read by a client is on the local datanode, instead of
|
||||||
* using {@link DataTransferProtocol} and connect to datanode, the short circuit
|
* using {@link DataTransferProtocol} and connect to datanode,
|
||||||
* read allows reading the file directly from the files on the local file
|
* the short circuit read allows reading the file directly
|
||||||
* system.
|
* from the files on the local file system.
|
||||||
*/
|
*/
|
||||||
public class TestShortCircuitLocalRead {
|
public class TestShortCircuitLocalRead {
|
||||||
private static TemporarySocketDirectory sockDir;
|
private static TemporarySocketDirectory sockDir;
|
||||||
|
@ -195,7 +195,8 @@ public class TestShortCircuitLocalRead {
|
||||||
|
|
||||||
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
|
||||||
|
|
||||||
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
ByteBuffer actual =
|
||||||
|
ByteBuffer.allocateDirect(expected.length - readOffset);
|
||||||
|
|
||||||
IOUtils.skipFully(stm, readOffset);
|
IOUtils.skipFully(stm, readOffset);
|
||||||
|
|
||||||
|
@ -230,7 +231,8 @@ public class TestShortCircuitLocalRead {
|
||||||
|
|
||||||
public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
|
public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size,
|
||||||
int readOffset, String shortCircuitUser, String readingUser,
|
int readOffset, String shortCircuitUser, String readingUser,
|
||||||
boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
boolean legacyShortCircuitFails)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
|
doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset,
|
||||||
shortCircuitUser, readingUser, legacyShortCircuitFails);
|
shortCircuitUser, readingUser, legacyShortCircuitFails);
|
||||||
}
|
}
|
||||||
|
@ -247,7 +249,8 @@ public class TestShortCircuitLocalRead {
|
||||||
*/
|
*/
|
||||||
public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
|
public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size,
|
||||||
int readOffset, String shortCircuitUser, String readingUser,
|
int readOffset, String shortCircuitUser, String readingUser,
|
||||||
boolean legacyShortCircuitFails) throws IOException, InterruptedException {
|
boolean legacyShortCircuitFails)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
|
@ -262,7 +265,8 @@ public class TestShortCircuitLocalRead {
|
||||||
if (shortCircuitUser != null) {
|
if (shortCircuitUser != null) {
|
||||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||||
shortCircuitUser);
|
shortCircuitUser);
|
||||||
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
conf.setBoolean(
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
||||||
}
|
}
|
||||||
if (simulatedStorage) {
|
if (simulatedStorage) {
|
||||||
SimulatedFSDataset.setFactory(conf);
|
SimulatedFSDataset.setFactory(conf);
|
||||||
|
@ -324,7 +328,8 @@ public class TestShortCircuitLocalRead {
|
||||||
*/
|
*/
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testLocalReadFallback() throws Exception {
|
public void testLocalReadFallback() throws Exception {
|
||||||
doTestShortCircuitReadLegacy(true, 13, 0, getCurrentUser(), "notallowed", true);
|
doTestShortCircuitReadLegacy(
|
||||||
|
true, 13, 0, getCurrentUser(), "notallowed", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
@ -367,7 +372,8 @@ public class TestShortCircuitLocalRead {
|
||||||
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
|
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
|
||||||
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
|
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
|
||||||
ClientDatanodeProtocol proxy =
|
ClientDatanodeProtocol proxy =
|
||||||
DFSUtilClient.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
|
DFSUtilClient.createClientDatanodeProtocolProxy(
|
||||||
|
dnInfo, conf, 60000, false);
|
||||||
try {
|
try {
|
||||||
proxy.getBlockLocalPathInfo(blk, token);
|
proxy.getBlockLocalPathInfo(blk, token);
|
||||||
Assert.fail("The call should have failed as this user "
|
Assert.fail("The call should have failed as this user "
|
||||||
|
@ -387,7 +393,8 @@ public class TestShortCircuitLocalRead {
|
||||||
int size = blockSize;
|
int size = blockSize;
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
conf.setBoolean(
|
||||||
|
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(),
|
new File(sockDir.getDir(),
|
||||||
"testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath());
|
"testSkipWithVerifyChecksum._PORT.sock").getAbsolutePath());
|
||||||
|
@ -434,7 +441,8 @@ public class TestShortCircuitLocalRead {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
conf.setBoolean(
|
||||||
|
HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY, false);
|
||||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||||
new File(sockDir.getDir(),
|
new File(sockDir.getDir(),
|
||||||
"testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath());
|
"testHandleTruncatedBlockFile._PORT.sock").getAbsolutePath());
|
||||||
|
@ -523,8 +531,8 @@ public class TestShortCircuitLocalRead {
|
||||||
System.out.println("Usage: test shortcircuit checksum threadCount");
|
System.out.println("Usage: test shortcircuit checksum threadCount");
|
||||||
System.exit(1);
|
System.exit(1);
|
||||||
}
|
}
|
||||||
boolean shortcircuit = Boolean.valueOf(args[0]);
|
boolean shortcircuit = Boolean.parseBoolean(args[0]);
|
||||||
boolean checksum = Boolean.valueOf(args[1]);
|
boolean checksum = Boolean.parseBoolean(args[1]);
|
||||||
int threadCount = Integer.parseInt(args[2]);
|
int threadCount = Integer.parseInt(args[2]);
|
||||||
|
|
||||||
// Setup create a file
|
// Setup create a file
|
||||||
|
@ -535,7 +543,8 @@ public class TestShortCircuitLocalRead {
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.SKIP_CHECKSUM_KEY,
|
||||||
checksum);
|
checksum);
|
||||||
|
|
||||||
// Override fileSize and DATA_TO_WRITE to much larger values for benchmark test
|
// Override fileSize and DATA_TO_WRITE to
|
||||||
|
// much larger values for benchmark test
|
||||||
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
|
int fileSize = 1000 * blockSize + 100; // File with 1000 blocks
|
||||||
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
|
final byte [] dataToWrite = AppendTestUtil.randomBytes(seed, fileSize);
|
||||||
|
|
||||||
|
@ -557,7 +566,8 @@ public class TestShortCircuitLocalRead {
|
||||||
for (int i = 0; i < iteration; i++) {
|
for (int i = 0; i < iteration; i++) {
|
||||||
try {
|
try {
|
||||||
String user = getCurrentUser();
|
String user = getCurrentUser();
|
||||||
checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
|
checkFileContent(
|
||||||
|
fs.getUri(), file1, dataToWrite, 0, user, conf, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -590,11 +600,13 @@ public class TestShortCircuitLocalRead {
|
||||||
* through RemoteBlockReader
|
* through RemoteBlockReader
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void doTestShortCircuitReadWithRemoteBlockReader(boolean ignoreChecksum,
|
public void doTestShortCircuitReadWithRemoteBlockReader(
|
||||||
int size, String shortCircuitUser, int readOffset,
|
boolean ignoreChecksum, int size, String shortCircuitUser,
|
||||||
boolean shortCircuitFails) throws IOException, InterruptedException {
|
int readOffset, boolean shortCircuitFails)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean(HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
conf.setBoolean(
|
||||||
|
HdfsClientConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER, true);
|
||||||
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
|
||||||
|
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||||
|
@ -603,7 +615,8 @@ public class TestShortCircuitLocalRead {
|
||||||
// check that / exists
|
// check that / exists
|
||||||
Path path = new Path("/");
|
Path path = new Path("/");
|
||||||
URI uri = cluster.getURI();
|
URI uri = cluster.getURI();
|
||||||
assertTrue("/ should be a directory", fs.getFileStatus(path).isDirectory());
|
assertTrue(
|
||||||
|
"/ should be a directory", fs.getFileStatus(path).isDirectory());
|
||||||
|
|
||||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
||||||
Path file1 = new Path("filelocal.dat");
|
Path file1 = new Path("filelocal.dat");
|
||||||
|
@ -615,10 +628,12 @@ public class TestShortCircuitLocalRead {
|
||||||
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
checkFileContent(uri, file1, fileData, readOffset, shortCircuitUser,
|
||||||
conf, shortCircuitFails);
|
conf, shortCircuitFails);
|
||||||
//RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
//RemoteBlockReader have unsupported method read(ByteBuffer bf)
|
||||||
assertTrue("RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
assertTrue(
|
||||||
|
"RemoteBlockReader unsupported method read(ByteBuffer bf) error",
|
||||||
checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
checkUnsupportedMethod(fs, file1, fileData, readOffset));
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
throw new IOException("doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
throw new IOException(
|
||||||
|
"doTestShortCircuitReadWithRemoteBlockReader ex error ", e);
|
||||||
} catch(InterruptedException inEx) {
|
} catch(InterruptedException inEx) {
|
||||||
throw inEx;
|
throw inEx;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -630,7 +645,8 @@ public class TestShortCircuitLocalRead {
|
||||||
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
private boolean checkUnsupportedMethod(FileSystem fs, Path file,
|
||||||
byte[] expected, int readOffset) throws IOException {
|
byte[] expected, int readOffset) throws IOException {
|
||||||
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(file);
|
||||||
ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
|
ByteBuffer actual =
|
||||||
|
ByteBuffer.allocateDirect(expected.length - readOffset);
|
||||||
IOUtils.skipFully(stm, readOffset);
|
IOUtils.skipFully(stm, readOffset);
|
||||||
try {
|
try {
|
||||||
stm.read(actual);
|
stm.read(actual);
|
||||||
|
|
|
@ -245,6 +245,63 @@ import org.junit.Test;
|
||||||
verify(fs).delete(stagingJobPath, true);
|
verify(fs).delete(stagingJobPath, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testByPreserveFailedStaging() throws IOException {
|
||||||
|
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||||
|
// Failed task's staging files should be kept
|
||||||
|
conf.setBoolean(MRJobConfig.PRESERVE_FAILED_TASK_FILES, true);
|
||||||
|
fs = mock(FileSystem.class);
|
||||||
|
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
||||||
|
//Staging Dir exists
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
|
||||||
|
when(fs.exists(stagingDir)).thenReturn(true);
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
|
||||||
|
0);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||||
|
jobid.setAppId(appId);
|
||||||
|
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
||||||
|
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
|
||||||
|
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
|
||||||
|
JobStateInternal.FAILED, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
|
||||||
|
appMaster.init(conf);
|
||||||
|
appMaster.start();
|
||||||
|
appMaster.shutDownJob();
|
||||||
|
//test whether notifyIsLastAMRetry called
|
||||||
|
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
|
||||||
|
verify(fs, times(0)).delete(stagingJobPath, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreservePatternMatchedStaging() throws IOException {
|
||||||
|
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
|
||||||
|
// The staging files that are matched to the pattern
|
||||||
|
// should not be deleted
|
||||||
|
conf.set(MRJobConfig.PRESERVE_FILES_PATTERN, "JobDir");
|
||||||
|
fs = mock(FileSystem.class);
|
||||||
|
when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
|
||||||
|
//Staging Dir exists
|
||||||
|
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||||
|
Path stagingDir = MRApps.getStagingAreaDir(conf, user);
|
||||||
|
when(fs.exists(stagingDir)).thenReturn(true);
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
|
||||||
|
0);
|
||||||
|
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
JobId jobid = recordFactory.newRecordInstance(JobId.class);
|
||||||
|
jobid.setAppId(appId);
|
||||||
|
ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
|
||||||
|
Assert.assertTrue(MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS > 1);
|
||||||
|
MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
|
||||||
|
JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
|
||||||
|
appMaster.init(conf);
|
||||||
|
appMaster.start();
|
||||||
|
appMaster.shutDownJob();
|
||||||
|
//test whether notifyIsLastAMRetry called
|
||||||
|
Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
|
||||||
|
verify(fs, times(0)).delete(stagingJobPath, true);
|
||||||
|
}
|
||||||
|
|
||||||
private class TestMRApp extends MRAppMaster {
|
private class TestMRApp extends MRAppMaster {
|
||||||
ContainerAllocator allocator;
|
ContainerAllocator allocator;
|
||||||
boolean testIsLastAMRetry = false;
|
boolean testIsLastAMRetry = false;
|
||||||
|
|
Loading…
Reference in New Issue