builder();
+
+ // Deprecated opcodes
+ b.add(FSEditLogOpCodes.OP_DATANODE_ADD)
+ .add(FSEditLogOpCodes.OP_DATANODE_REMOVE)
+ .add(FSEditLogOpCodes.OP_SET_NS_QUOTA)
+ .add(FSEditLogOpCodes.OP_CLEAR_NS_QUOTA)
+ .add(FSEditLogOpCodes.OP_SET_GENSTAMP_V1);
+
+ // Cannot test delegation token related code in insecure set up
+ b.add(FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN)
+ .add(FSEditLogOpCodes.OP_RENEW_DELEGATION_TOKEN)
+ .add(FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN);
+
+ // Skip invalid opcode
+ b.add(FSEditLogOpCodes.OP_INVALID);
+ return b.build();
}
- /**
- * Initialize missingOpcodes
- *
- * Opcodes that are not available except after uprade from
- * an older version. We don't test these here.
- */
- private static void initializeMissingOpCodes() {
- obsoleteOpCodes.put(FSEditLogOpCodes.OP_SET_GENSTAMP_V1, true);
- }
+ @Rule
+ public final TemporaryFolder folder = new TemporaryFolder();
@Before
- public void setup() {
- new File(cacheDir).mkdirs();
+ public void setUp() throws IOException {
+ nnHelper.startCluster(buildDir + "/dfs/");
}
-
+
+ @After
+ public void tearDown() throws IOException {
+ nnHelper.shutdownCluster();
+ }
+
/**
* Test the OfflineEditsViewer
*/
@Test
public void testGenerated() throws IOException {
-
- LOG.info("START - testing with generated edits");
-
- nnHelper.startCluster(buildDir + "/dfs/");
-
// edits generated by nnHelper (MiniDFSCluster), should have all op codes
// binary, XML, reparsed binary
- String edits = nnHelper.generateEdits();
- String editsParsedXml = cacheDir + "/editsParsed.xml";
- String editsReparsed = cacheDir + "/editsReparsed";
+ String edits = nnHelper.generateEdits();
+ String editsParsedXml = folder.newFile("editsParsed.xml").getAbsolutePath();
+ String editsReparsed = folder.newFile("editsParsed").getAbsolutePath();
// parse to XML then back to binary
assertEquals(0, runOev(edits, editsParsedXml, "xml", false));
assertEquals(0, runOev(editsParsedXml, editsReparsed, "binary", false));
// judgment time
+ assertTrue("Edits " + edits + " should have all op codes",
+ hasAllOpCodes(edits));
+ LOG.info("Comparing generated file " + editsReparsed
+ + " with reference file " + edits);
assertTrue(
- "Edits " + edits + " should have all op codes",
- hasAllOpCodes(edits));
- LOG.info("Comparing generated file " + editsReparsed +
- " with reference file " + edits);
- assertTrue(
- "Generated edits and reparsed (bin to XML to bin) should be same",
- filesEqualIgnoreTrailingZeros(edits, editsReparsed));
-
- // removes edits so do this at the end
- nnHelper.shutdownCluster();
-
- LOG.info("END");
+ "Generated edits and reparsed (bin to XML to bin) should be same",
+ filesEqualIgnoreTrailingZeros(edits, editsReparsed));
}
@Test
public void testRecoveryMode() throws IOException {
- LOG.info("START - testing with generated edits");
-
- nnHelper.startCluster(buildDir + "/dfs/");
-
// edits generated by nnHelper (MiniDFSCluster), should have all op codes
// binary, XML, reparsed binary
- String edits = nnHelper.generateEdits();
-
+ String edits = nnHelper.generateEdits();
+ FileOutputStream os = new FileOutputStream(edits, true);
// Corrupt the file by truncating the end
- FileChannel editsFile = new FileOutputStream(edits, true).getChannel();
+ FileChannel editsFile = os.getChannel();
editsFile.truncate(editsFile.size() - 5);
-
- String editsParsedXml = cacheDir + "/editsRecoveredParsed.xml";
- String editsReparsed = cacheDir + "/editsRecoveredReparsed";
- String editsParsedXml2 = cacheDir + "/editsRecoveredParsed2.xml";
+
+ String editsParsedXml = folder.newFile("editsRecoveredParsed.xml")
+ .getAbsolutePath();
+ String editsReparsed = folder.newFile("editsRecoveredReparsed")
+ .getAbsolutePath();
+ String editsParsedXml2 = folder.newFile("editsRecoveredParsed2.xml")
+ .getAbsolutePath();
// Can't read the corrupted file without recovery mode
assertEquals(-1, runOev(edits, editsParsedXml, "xml", false));
-
+
// parse to XML then back to binary
assertEquals(0, runOev(edits, editsParsedXml, "xml", true));
- assertEquals(0, runOev(editsParsedXml, editsReparsed, "binary", false));
+ assertEquals(0, runOev(editsParsedXml, editsReparsed, "binary", false));
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
// judgment time
assertTrue("Test round trip",
- filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
+ filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
- // removes edits so do this at the end
- nnHelper.shutdownCluster();
-
- LOG.info("END");
+ os.close();
}
@Test
public void testStored() throws IOException {
-
- LOG.info("START - testing with stored reference edits");
-
// reference edits stored with source code (see build.xml)
+ final String cacheDir = System.getProperty("test.cache.data",
+ "build/test/cache");
// binary, XML, reparsed binary
- String editsStored = cacheDir + "/editsStored";
- String editsStoredParsedXml = cacheDir + "/editsStoredParsed.xml";
- String editsStoredReparsed = cacheDir + "/editsStoredReparsed";
+ String editsStored = cacheDir + "/editsStored";
+ String editsStoredParsedXml = cacheDir + "/editsStoredParsed.xml";
+ String editsStoredReparsed = cacheDir + "/editsStoredReparsed";
// reference XML version of editsStored (see build.xml)
- String editsStoredXml = cacheDir + "/editsStored.xml";
-
+ String editsStoredXml = cacheDir + "/editsStored.xml";
+
// parse to XML then back to binary
assertEquals(0, runOev(editsStored, editsStoredParsedXml, "xml", false));
- assertEquals(0, runOev(editsStoredParsedXml, editsStoredReparsed,
- "binary", false));
+ assertEquals(0,
+ runOev(editsStoredParsedXml, editsStoredReparsed, "binary", false));
// judgement time
+ assertTrue("Edits " + editsStored + " should have all op codes",
+ hasAllOpCodes(editsStored));
+ assertTrue("Reference XML edits and parsed to XML should be same",
+ filesEqual(editsStoredXml, editsStoredParsedXml));
assertTrue(
- "Edits " + editsStored + " should have all op codes",
- hasAllOpCodes(editsStored));
- assertTrue(
- "Reference XML edits and parsed to XML should be same",
- filesEqual(editsStoredXml, editsStoredParsedXml));
- assertTrue(
- "Reference edits and reparsed (bin to XML to bin) should be same",
- filesEqualIgnoreTrailingZeros(editsStored, editsStoredReparsed));
-
- LOG.info("END");
+ "Reference edits and reparsed (bin to XML to bin) should be same",
+ filesEqualIgnoreTrailingZeros(editsStored, editsStoredReparsed));
}
/**
@@ -233,22 +205,17 @@ public class TestOfflineEditsViewer {
OfflineEditsViewer oev = new OfflineEditsViewer();
if (oev.go(inFilename, outFilename, "stats", new Flags(), visitor) != 0)
return false;
- LOG.info("Statistics for " + inFilename + "\n" +
- visitor.getStatisticsString());
-
+ LOG.info("Statistics for " + inFilename + "\n"
+ + visitor.getStatisticsString());
+
boolean hasAllOpCodes = true;
- for(FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
+ for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) {
// don't need to test obsolete opCodes
- if(obsoleteOpCodes.containsKey(opCode)) {
+ if (skippedOps.contains(opCode))
continue;
- } else if (missingOpCodes.containsKey(opCode)) {
- continue;
- } else if (opCode == FSEditLogOpCodes.OP_INVALID) {
- continue;
- }
Long count = visitor.getStatistics().get(opCode);
- if((count == null) || (count == 0)) {
+ if ((count == null) || (count == 0)) {
hasAllOpCodes = false;
LOG.info("Opcode " + opCode + " not tested in " + inFilename);
}
@@ -257,9 +224,9 @@ public class TestOfflineEditsViewer {
}
/**
- * Compare two files, ignore trailing zeros at the end,
- * for edits log the trailing zeros do not make any difference,
- * throw exception is the files are not same
+ * Compare two files, ignore trailing zeros at the end, for edits log the
+ * trailing zeros do not make any difference, throw exception is the files are
+ * not same
*
* @param filenameSmall first file to compare (doesn't have to be smaller)
* @param filenameLarge second file to compare (doesn't have to be larger)
@@ -271,7 +238,7 @@ public class TestOfflineEditsViewer {
ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
// now correct if it's otherwise
- if(small.capacity() > large.capacity()) {
+ if (small.capacity() > large.capacity()) {
ByteBuffer tmpByteBuffer = small;
small = large;
large = tmpByteBuffer;
@@ -288,13 +255,15 @@ public class TestOfflineEditsViewer {
large.limit(small.capacity());
// compares position to limit
- if(!small.equals(large)) { return false; }
+ if (!small.equals(large)) {
+ return false;
+ }
// everything after limit should be 0xFF
int i = large.limit();
large.clear();
- for(; i < large.capacity(); i++) {
- if(large.get(i) != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
+ for (; i < large.capacity(); i++) {
+ if (large.get(i) != FSEditLogOpCodes.OP_INVALID.getOpCode()) {
return false;
}
}
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 52aefce295a..668b4a893fc 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -196,6 +196,8 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5550. Task Status message (reporter.setStatus) not shown in UI
with Hadoop 2.0 (Gera Shegalov via Sandy Ryza)
+ MAPREDUCE-3310. Custom grouping comparator cannot be set for Combiners (tucu)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@@ -267,6 +269,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
WrappedReducer class. (Yi Song via vinodkv)
+ MAPREDUCE-5689. MRAppMaster does not preempt reducers when scheduled maps
+ cannot be fulfilled. (lohit via kasha)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 18491fdbf1d..a8ee06b3b48 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -229,7 +229,8 @@ public class RMContainerAllocator extends RMContainerRequestor
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
- if (lastCompletedTasks != completedTasks) {
+ if ((lastCompletedTasks != completedTasks) ||
+ (scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
index 3a6644e4349..3eb5222865c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
@@ -1604,6 +1604,21 @@ public class TestRMContainerAllocator {
numPendingReduces,
maxReduceRampupLimit, reduceSlowStart);
verify(allocator).rampDownReduces(anyInt());
+
+ // Test reduce ramp-down for when there are scheduled maps
+ // Since we have two scheduled Maps, rampDownReducers
+ // should be invoked twice.
+ scheduledMaps = 2;
+ assignedReduces = 2;
+ doReturn(10 * 1024).when(allocator).getMemLimit();
+ allocator.scheduleReduces(
+ totalMaps, succeededMaps,
+ scheduledMaps, scheduledReduces,
+ assignedMaps, assignedReduces,
+ mapResourceReqt, reduceResourceReqt,
+ numPendingReduces,
+ maxReduceRampupLimit, reduceSlowStart);
+ verify(allocator, times(2)).rampDownReduces(anyInt());
}
private static class RecalculateContainerAllocator extends MyContainerAllocator {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index 5bae686ab20..fd9e95d4916 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -949,12 +949,29 @@ public class JobConf extends Configuration {
return get(KeyFieldBasedPartitioner.PARTITIONER_OPTIONS);
}
+ /**
+ * Get the user defined {@link WritableComparable} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see #setCombinerKeyGroupingComparator(Class) for details.
+ */
+ public RawComparator getCombinerKeyGroupingComparator() {
+ Class extends RawComparator> theClass = getClass(
+ JobContext.COMBINER_GROUP_COMPARATOR_CLASS, null, RawComparator.class);
+ if (theClass == null) {
+ return getOutputKeyComparator();
+ }
+
+ return ReflectionUtils.newInstance(theClass, this);
+ }
+
/**
* Get the user defined {@link WritableComparable} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
- * @see #setOutputValueGroupingComparator(Class) for details.
+ * @see #setOutputValueGroupingComparator(Class) for details.
*/
public RawComparator getOutputValueGroupingComparator() {
Class extends RawComparator> theClass = getClass(
@@ -966,6 +983,37 @@ public class JobConf extends Configuration {
return ReflectionUtils.newInstance(theClass, this);
}
+ /**
+ * Set the user defined {@link RawComparator} comparator for
+ * grouping keys in the input to the combiner.
+ *
+ * This comparator should be provided if the equivalence rules for keys
+ * for sorting the intermediates are different from those for grouping keys
+ * before each call to
+ * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.
+ *
+ * For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+ * in a single call to the reduce function if K1 and K2 compare as equal.
+ *
+ * Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+ * how keys are sorted, this can be used in conjunction to simulate
+ * secondary sort on values.
+ *
+ * Note: This is not a guarantee of the combiner sort being
+ * stable in any sense. (In any case, with the order of available
+ * map-outputs to the combiner being non-deterministic, it wouldn't make
+ * that much sense.)
+ *
+ * @param theClass the comparator class to be used for grouping keys for the
+ * combiner. It should implement RawComparator
.
+ * @see #setOutputKeyComparatorClass(Class)
+ */
+ public void setCombinerKeyGroupingComparator(
+ Class extends RawComparator> theClass) {
+ setClass(JobContext.COMBINER_GROUP_COMPARATOR_CLASS,
+ theClass, RawComparator.class);
+ }
+
/**
* Set the user defined {@link RawComparator} comparator for
* grouping keys in the input to the reduce.
@@ -989,7 +1037,8 @@ public class JobConf extends Configuration {
*
* @param theClass the comparator class to be used for grouping keys.
* It should implement RawComparator
.
- * @see #setOutputKeyComparatorClass(Class)
+ * @see #setOutputKeyComparatorClass(Class)
+ * @see #setCombinerKeyGroupingComparator(Class)
*/
public void setOutputValueGroupingComparator(
Class extends RawComparator> theClass) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
index 660ffc65ad3..72cd41c9ea6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
@@ -1575,7 +1575,8 @@ abstract public class Task implements Writable, Configurable {
combinerClass = cls;
keyClass = (Class) job.getMapOutputKeyClass();
valueClass = (Class) job.getMapOutputValueClass();
- comparator = (RawComparator) job.getOutputKeyComparator();
+ comparator = (RawComparator)
+ job.getCombinerKeyGroupingComparator();
}
@SuppressWarnings("unchecked")
@@ -1624,7 +1625,7 @@ abstract public class Task implements Writable, Configurable {
this.taskId = taskId;
keyClass = (Class) context.getMapOutputKeyClass();
valueClass = (Class) context.getMapOutputValueClass();
- comparator = (RawComparator) context.getSortComparator();
+ comparator = (RawComparator) context.getCombinerKeyGroupingComparator();
this.committer = committer;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 78c6b4b1a9c..115a2b9fee3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -948,11 +948,27 @@ public class Job extends JobContextImpl implements JobContext {
conf.setOutputValueClass(theClass);
}
+ /**
+ * Define the comparator that controls which keys are grouped together
+ * for a single call to combiner,
+ * {@link Reducer#reduce(Object, Iterable,
+ * org.apache.hadoop.mapreduce.Reducer.Context)}
+ *
+ * @param cls the raw comparator to use
+ * @throws IllegalStateException if the job is submitted
+ */
+ public void setCombinerKeyGroupingComparatorClass(
+ Class extends RawComparator> cls) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
+ conf.setCombinerKeyGroupingComparator(cls);
+ }
+
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
+ * @see #setCombinerKeyGroupingComparatorClass(Class)
*/
public void setSortComparatorClass(Class extends RawComparator> cls
) throws IllegalStateException {
@@ -967,6 +983,7 @@ public class Job extends JobContextImpl implements JobContext {
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
+ * @see #setCombinerKeyGroupingComparatorClass(Class)
*/
public void setGroupingComparatorClass(Class extends RawComparator> cls
) throws IllegalStateException {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
index 4842e20b9c4..836f1829079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
@@ -167,13 +167,23 @@ public interface JobContext extends MRJobConfig {
*/
public String getJar();
- /**
- * Get the user defined {@link RawComparator} comparator for
- * grouping keys of inputs to the reduce.
- *
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
* @return comparator set by the user for grouping values.
- * @see Job#setGroupingComparatorClass(Class) for details.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class)
*/
+ public RawComparator> getCombinerKeyGroupingComparator();
+
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the reduce.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setGroupingComparatorClass(Class)
+ * @see #getCombinerKeyGroupingComparator()
+ */
public RawComparator> getGroupingComparator();
/**
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index e696b865533..1be7ba3e3b9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -93,6 +93,8 @@ public interface MRJobConfig {
public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+ public static final String COMBINER_GROUP_COMPARATOR_CLASS = "mapreduce.job.combiner.group.comparator.class";
+
public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
public static final String WORKING_DIR = "mapreduce.job.working.dir";
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
index 598bb936060..ea2c77ace9b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
@@ -166,6 +166,11 @@ class ChainMapContextImpl implements
return base.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return base.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
index 8d6648468e8..5e9a1add874 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
@@ -159,6 +159,11 @@ class ChainReduceContextImpl implements
return base.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return base.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return base.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
index 95c4b90c0f9..8865a36c31d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
@@ -168,6 +168,11 @@ public class WrappedMapper
return mapContext.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return mapContext.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return mapContext.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
index 39178642f24..185c135c2e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
@@ -161,6 +161,11 @@ public class WrappedReducer
return reduceContext.getFileTimestamps();
}
+ @Override
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return reduceContext.getCombinerKeyGroupingComparator();
+ }
+
@Override
public RawComparator> getGroupingComparator() {
return reduceContext.getGroupingComparator();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
index b4c6dca5545..247c2f2029b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
@@ -252,6 +252,17 @@ public class JobContextImpl implements JobContext {
return conf.getJar();
}
+ /**
+ * Get the user defined {@link RawComparator} comparator for
+ * grouping keys of inputs to the combiner.
+ *
+ * @return comparator set by the user for grouping values.
+ * @see Job#setCombinerKeyGroupingComparatorClass(Class) for details.
+ */
+ public RawComparator> getCombinerKeyGroupingComparator() {
+ return conf.getCombinerKeyGroupingComparator();
+ }
+
/**
* Get the user defined {@link RawComparator} comparator for
* grouping keys of inputs to the reduce.
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index ca3bed93998..a821e4d1b8a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -582,7 +582,7 @@ public class MergeManagerImpl implements MergeManager {
Class keyClass = (Class) job.getMapOutputKeyClass();
Class valClass = (Class) job.getMapOutputValueClass();
RawComparator comparator =
- (RawComparator)job.getOutputKeyComparator();
+ (RawComparator)job.getCombinerKeyGroupingComparator();
try {
CombineValuesIterator values = new CombineValuesIterator(
kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
new file mode 100644
index 00000000000..96919bef68f
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestOldCombinerGrouping.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestOldCombinerGrouping {
+ private static String TEST_ROOT_DIR =
+ new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+ public static class Map implements
+ Mapper {
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ String v = value.toString();
+ String k = v.substring(0, v.indexOf(","));
+ v = v.substring(v.indexOf(",") + 1);
+ output.collect(new Text(k), new LongWritable(Long.parseLong(v)));
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+ }
+
+ public static class Reduce implements
+ Reducer {
+
+ @Override
+ public void reduce(Text key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ LongWritable maxValue = null;
+ while (values.hasNext()) {
+ LongWritable value = values.next();
+ if (maxValue == null) {
+ maxValue = value;
+ } else if (value.compareTo(maxValue) > 0) {
+ maxValue = value;
+ }
+ }
+ output.collect(key, maxValue);
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void configure(JobConf job) {
+ }
+ }
+
+ public static class Combiner extends Reduce {
+ }
+
+ public static class GroupComparator implements RawComparator {
+ @Override
+ public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+ int i4) {
+ byte[] b1 = new byte[i2];
+ System.arraycopy(bytes, i, b1, 0, i2);
+
+ byte[] b2 = new byte[i4];
+ System.arraycopy(bytes2, i3, b2, 0, i4);
+
+ return compare(new Text(new String(b1)), new Text(new String(b2)));
+ }
+
+ @Override
+ public int compare(Text o1, Text o2) {
+ String s1 = o1.toString();
+ String s2 = o2.toString();
+ s1 = s1.substring(0, s1.indexOf("|"));
+ s2 = s2.substring(0, s2.indexOf("|"));
+ return s1.compareTo(s2);
+ }
+
+ }
+
+ @Test
+ public void testCombiner() throws Exception {
+ if (!new File(TEST_ROOT_DIR).mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+ }
+ File in = new File(TEST_ROOT_DIR, "input");
+ if (!in.mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + in);
+ }
+ File out = new File(TEST_ROOT_DIR, "output");
+ PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+ pw.println("A|a,1");
+ pw.println("A|b,2");
+ pw.println("B|a,3");
+ pw.println("B|b,4");
+ pw.println("B|c,5");
+ pw.close();
+ JobConf job = new JobConf();
+ job.set("mapreduce.framework.name", "local");
+ TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+ TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+ job.setMapperClass(Map.class);
+ job.setReducerClass(Reduce.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputValueGroupingComparator(GroupComparator.class);
+
+ job.setCombinerClass(Combiner.class);
+ job.setCombinerKeyGroupingComparator(GroupComparator.class);
+ job.setInt("min.num.spills.for.combine", 0);
+
+ JobClient client = new JobClient(job);
+ RunningJob runningJob = client.submitJob(job);
+ runningJob.waitForCompletion();
+ if (runningJob.isSuccessful()) {
+ Counters counters = runningJob.getCounters();
+
+ long combinerInputRecords = counters.getGroup(
+ "org.apache.hadoop.mapreduce.TaskCounter").
+ getCounter("COMBINE_INPUT_RECORDS");
+ long combinerOutputRecords = counters.getGroup(
+ "org.apache.hadoop.mapreduce.TaskCounter").
+ getCounter("COMBINE_OUTPUT_RECORDS");
+ Assert.assertTrue(combinerInputRecords > 0);
+ Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+ BufferedReader br = new BufferedReader(new FileReader(
+ new File(out, "part-00000")));
+ Set output = new HashSet();
+ String line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNull(line);
+ br.close();
+
+ Set expected = new HashSet();
+ expected.add("A2");
+ expected.add("B5");
+
+ Assert.assertEquals(expected, output);
+
+ } else {
+ Assert.fail("Job failed");
+ }
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
new file mode 100644
index 00000000000..c4b734bdc5b
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestNewCombinerGrouping.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import junit.framework.Assert;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+public class TestNewCombinerGrouping {
+ private static String TEST_ROOT_DIR =
+ new File("build", UUID.randomUUID().toString()).getAbsolutePath();
+
+ public static class Map extends
+ Mapper {
+
+ @Override
+ protected void map(LongWritable key, Text value,
+ Context context)
+ throws IOException, InterruptedException {
+ String v = value.toString();
+ String k = v.substring(0, v.indexOf(","));
+ v = v.substring(v.indexOf(",") + 1);
+ context.write(new Text(k), new LongWritable(Long.parseLong(v)));
+ }
+ }
+
+ public static class Reduce extends
+ Reducer {
+
+ @Override
+ protected void reduce(Text key, Iterable values,
+ Context context)
+ throws IOException, InterruptedException {
+ LongWritable maxValue = null;
+ for (LongWritable value : values) {
+ if (maxValue == null) {
+ maxValue = value;
+ } else if (value.compareTo(maxValue) > 0) {
+ maxValue = value;
+ }
+ }
+ context.write(key, maxValue);
+ }
+ }
+
+ public static class Combiner extends Reduce {
+ }
+
+ public static class GroupComparator implements RawComparator {
+ @Override
+ public int compare(byte[] bytes, int i, int i2, byte[] bytes2, int i3,
+ int i4) {
+ byte[] b1 = new byte[i2];
+ System.arraycopy(bytes, i, b1, 0, i2);
+
+ byte[] b2 = new byte[i4];
+ System.arraycopy(bytes2, i3, b2, 0, i4);
+
+ return compare(new Text(new String(b1)), new Text(new String(b2)));
+ }
+
+ @Override
+ public int compare(Text o1, Text o2) {
+ String s1 = o1.toString();
+ String s2 = o2.toString();
+ s1 = s1.substring(0, s1.indexOf("|"));
+ s2 = s2.substring(0, s2.indexOf("|"));
+ return s1.compareTo(s2);
+ }
+
+ }
+
+ @Test
+ public void testCombiner() throws Exception {
+ if (!new File(TEST_ROOT_DIR).mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + TEST_ROOT_DIR);
+ }
+ File in = new File(TEST_ROOT_DIR, "input");
+ if (!in.mkdirs()) {
+ throw new RuntimeException("Could not create test dir: " + in);
+ }
+ File out = new File(TEST_ROOT_DIR, "output");
+ PrintWriter pw = new PrintWriter(new FileWriter(new File(in, "data.txt")));
+ pw.println("A|a,1");
+ pw.println("A|b,2");
+ pw.println("B|a,3");
+ pw.println("B|b,4");
+ pw.println("B|c,5");
+ pw.close();
+ JobConf conf = new JobConf();
+ conf.set("mapreduce.framework.name", "local");
+ Job job = new Job(conf);
+ TextInputFormat.setInputPaths(job, new Path(in.getPath()));
+ TextOutputFormat.setOutputPath(job, new Path(out.getPath()));
+
+ job.setMapperClass(Map.class);
+ job.setReducerClass(Reduce.class);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setGroupingComparatorClass(GroupComparator.class);
+
+ job.setCombinerKeyGroupingComparatorClass(GroupComparator.class);
+ job.setCombinerClass(Combiner.class);
+ job.getConfiguration().setInt("min.num.spills.for.combine", 0);
+
+ job.submit();
+ job.waitForCompletion(false);
+ if (job.isSuccessful()) {
+ Counters counters = job.getCounters();
+
+ long combinerInputRecords = counters.findCounter(
+ "org.apache.hadoop.mapreduce.TaskCounter",
+ "COMBINE_INPUT_RECORDS").getValue();
+ long combinerOutputRecords = counters.findCounter(
+ "org.apache.hadoop.mapreduce.TaskCounter",
+ "COMBINE_OUTPUT_RECORDS").getValue();
+ Assert.assertTrue(combinerInputRecords > 0);
+ Assert.assertTrue(combinerInputRecords > combinerOutputRecords);
+
+ BufferedReader br = new BufferedReader(new FileReader(
+ new File(out, "part-r-00000")));
+ Set output = new HashSet();
+ String line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNotNull(line);
+ output.add(line.substring(0, 1) + line.substring(4, 5));
+ line = br.readLine();
+ Assert.assertNull(line);
+ br.close();
+
+ Set expected = new HashSet();
+ expected.add("A2");
+ expected.add("B5");
+
+ Assert.assertEquals(expected, output);
+
+ } else {
+ Assert.fail("Job failed");
+ }
+ }
+
+}
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 33225f06dbb..44fefb21ac5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -55,6 +55,9 @@ Release 2.4.0 - UNRELEASED
YARN-1028. Added FailoverProxyProvider capability to ResourceManager to help
with RM failover. (Karthik Kambatla via vinodkv)
+ YARN-1029. Added embedded leader election in the ResourceManager. (Karthik
+ Kambatla via vinodkv)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
@@ -197,6 +200,11 @@ Release 2.4.0 - UNRELEASED
YARN-1493. Changed ResourceManager and Scheduler interfacing to recognize
app-attempts separately from apps. (Jian He via vinodkv)
+ YARN-1482. Modified WebApplicationProxy to make it work across ResourceManager
+ fail-over. (Xuan Gong via vinodkv)
+
+ YARN-1568. Rename clusterid to clusterId in ActiveRMInfoProto (kasha)
+
OPTIMIZATIONS
BUG FIXES
@@ -286,6 +294,18 @@ Release 2.4.0 - UNRELEASED
YARN-1549. Fixed a bug in ResourceManager's ApplicationMasterService that
was causing unamanged AMs to not finish correctly. (haosdent via vinodkv)
+ YARN-1559. Race between ServerRMProxy and ClientRMProxy setting
+ RMProxy#INSTANCE. (kasha and vinodkv via kasha)
+
+ YARN-1560. Fixed TestYarnClient#testAMMRTokens failure with null AMRM token.
+ (Ted Yu via jianhe)
+
+ YARN-1409. NonAggregatingLogHandler can throw RejectedExecutionException
+ (Tsuyoshi OZAWA via jlowe)
+
+ YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure
+ caused by non-English system locale. (Tsuyoshi OZAWA via jianhe)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 486bebfec50..74ca61b8578 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -309,13 +309,4 @@
-
-
-
-
-
-
-
-
-
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
index cbdb8b34c88..3cbde93a3e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
@@ -51,6 +51,22 @@ public class HAUtil {
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
}
+ public static boolean isAutomaticFailoverEnabled(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ENABLED);
+ }
+
+ public static boolean isAutomaticFailoverEnabledAndEmbedded(
+ Configuration conf) {
+ return isAutomaticFailoverEnabled(conf) &&
+ isAutomaticFailoverEmbedded(conf);
+ }
+
+ public static boolean isAutomaticFailoverEmbedded(Configuration conf) {
+ return conf.getBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_EMBEDDED);
+ }
+
/**
* Verify configuration for Resource Manager HA.
* @param conf Configuration
@@ -162,8 +178,7 @@ public class HAUtil {
* @param conf Configuration. Please use verifyAndSetRMHAId to check.
* @return RM Id on success
*/
- @VisibleForTesting
- static String getRMHAId(Configuration conf) {
+ public static String getRMHAId(Configuration conf) {
return conf.get(YarnConfiguration.RM_HA_ID);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index e96c217b8cc..4adba7983ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -59,7 +59,7 @@ public class YarnConfiguration extends Configuration {
public static final String IPC_PREFIX = YARN_PREFIX + "ipc.";
/** Factory to create client IPC classes.*/
- public static final String IPC_CLIENT_FACTORY_CLASS =
+ public static final String IPC_CLIENT_FACTORY_CLASS =
IPC_PREFIX + "client.factory.class";
public static final String DEFAULT_IPC_CLIENT_FACTORY_CLASS =
"org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl";
@@ -87,6 +87,8 @@ public class YarnConfiguration extends Configuration {
////////////////////////////////
public static final String RM_PREFIX = "yarn.resourcemanager.";
+ public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
+
/** The address of the applications manager interface in the RM.*/
public static final String RM_ADDRESS =
RM_PREFIX + "address";
@@ -278,6 +280,36 @@ public class YarnConfiguration extends Configuration {
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
+ /** Zookeeper interaction configs */
+ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
+
+ public static final String RM_ZK_ADDRESS = RM_ZK_PREFIX + "address";
+
+ public static final String RM_ZK_NUM_RETRIES = RM_ZK_PREFIX + "num-retries";
+ public static final int DEFAULT_ZK_RM_NUM_RETRIES = 500;
+
+ public static final String RM_ZK_RETRY_INTERVAL_MS =
+ RM_ZK_PREFIX + "retry-interval-ms";
+ public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 2000;
+
+ public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
+ public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
+
+ public static final String RM_ZK_ACL = RM_ZK_PREFIX + "acl";
+ public static final String DEFAULT_RM_ZK_ACL = "world:anyone:rwcda";
+
+ public static final String ZK_STATE_STORE_PREFIX =
+ RM_PREFIX + "zk-state-store.";
+
+ /** Parent znode path under which ZKRMStateStore will create znodes */
+ public static final String ZK_RM_STATE_STORE_PARENT_PATH =
+ ZK_STATE_STORE_PREFIX + "parent-path";
+ public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
+
+ /** Root node ACLs for fencing */
+ public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
+ ZK_STATE_STORE_PREFIX + "root-node.acl";
+
/** HA related configs */
public static final String RM_HA_PREFIX = RM_PREFIX + "ha.";
public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
@@ -296,6 +328,22 @@ public class YarnConfiguration extends Configuration {
HttpConfig.isSecure() ? RM_WEBAPP_HTTPS_ADDRESS
: RM_WEBAPP_ADDRESS));
+ public static final String AUTO_FAILOVER_PREFIX =
+ RM_HA_PREFIX + "automatic-failover.";
+
+ public static final String AUTO_FAILOVER_ENABLED =
+ AUTO_FAILOVER_PREFIX + "enabled";
+ public static final boolean DEFAULT_AUTO_FAILOVER_ENABLED = false;
+
+ public static final String AUTO_FAILOVER_EMBEDDED =
+ AUTO_FAILOVER_PREFIX + "embedded";
+ public static final boolean DEFAULT_AUTO_FAILOVER_EMBEDDED = false;
+
+ public static final String AUTO_FAILOVER_ZK_BASE_PATH =
+ AUTO_FAILOVER_PREFIX + "zk-base-path";
+ public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH =
+ "/yarn-leader-election";
+
public static final String CLIENT_FAILOVER_PREFIX =
YARN_PREFIX + "client.failover-";
public static final String CLIENT_FAILOVER_PROXY_PROVIDER =
@@ -334,36 +382,6 @@ public class YarnConfiguration extends Configuration {
+ "fs.state-store.retry-policy-spec";
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
- /**
- * Comma separated host:port pairs, each corresponding to a ZK server for
- * ZKRMStateStore
- */
- public static final String ZK_STATE_STORE_PREFIX =
- RM_PREFIX + "zk-state-store.";
- public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
- ZK_STATE_STORE_PREFIX + "num-retries";
- public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 500;
- /** retry interval when connecting to zookeeper*/
- public static final String ZK_RM_STATE_STORE_RETRY_INTERVAL_MS =
- ZK_STATE_STORE_PREFIX + "retry-interval-ms";
- public static final long DEFAULT_ZK_RM_STATE_STORE_RETRY_INTERVAL_MS = 2000;
- public static final String ZK_RM_STATE_STORE_ADDRESS =
- ZK_STATE_STORE_PREFIX + "address";
- /** Timeout in millisec for ZK server connection for ZKRMStateStore */
- public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
- ZK_STATE_STORE_PREFIX + "timeout-ms";
- public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
- /** Parent znode path under which ZKRMStateStore will create znodes */
- public static final String ZK_RM_STATE_STORE_PARENT_PATH =
- ZK_STATE_STORE_PREFIX + "parent-path";
- public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
- /** ACL for znodes in ZKRMStateStore */
- public static final String ZK_RM_STATE_STORE_ACL =
- ZK_STATE_STORE_PREFIX + "acl";
- public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
- "world:anyone:rwcda";
- public static final String ZK_RM_STATE_STORE_ROOT_NODE_ACL =
- ZK_STATE_STORE_PREFIX + "root-node.acl";
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
index 62bd649c209..e27f12a836a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
@@ -133,3 +133,11 @@ message RMStateVersionProto {
optional int32 major_version = 1;
optional int32 minor_version = 2;
}
+
+//////////////////////////////////////////////////////////////////
+///////////// RM Failover related records ////////////////////////
+//////////////////////////////////////////////////////////////////
+message ActiveRMInfoProto {
+ required string clusterId = 1;
+ required string rmId = 2;
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index b2a59b93467..1ef1f4b7f96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -30,7 +30,13 @@
-
+
+ org.apache.zookeeper
+ zookeeper
+ test-jar
+ test
+
+
org.apache.hadoop
hadoop-yarn-api
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
index fed26d7f5aa..8900b160dc0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -24,41 +24,52 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-public class TestRMFailover {
+public class TestRMFailover extends ClientBaseWithFixes {
private static final Log LOG =
LogFactory.getLog(TestRMFailover.class.getName());
+ private static final HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
private static final String RM1_NODE_ID = "rm1";
private static final int RM1_PORT_BASE = 10000;
private static final String RM2_NODE_ID = "rm2";
private static final int RM2_PORT_BASE = 20000;
- private static final HAServiceProtocol.StateChangeRequestInfo req =
- new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
- private static Configuration conf;
- private static MiniYARNCluster cluster;
+ private Configuration conf;
+ private MiniYARNCluster cluster;
+ private ApplicationId fakeAppId;
- private static void setConfForRM(String rmId, String prefix, String value) {
+
+ private void setConfForRM(String rmId, String prefix, String value) {
conf.set(HAUtil.addSuffix(prefix, rmId), value);
}
- private static void setRpcAddressForRM(String rmId, int base) {
+ private void setRpcAddressForRM(String rmId, int base) {
setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" +
(base + YarnConfiguration.DEFAULT_RM_PORT));
setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" +
@@ -73,13 +84,9 @@ public class TestRMFailover {
(base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT));
}
- private static AdminService getRMAdminService(int index) {
- return
- cluster.getResourceManager(index).getRMContext().getRMAdminService();
- }
-
- @BeforeClass
- public static void setup() throws IOException {
+ @Before
+ public void setup() throws IOException {
+ fakeAppId = ApplicationId.newInstance(System.currentTimeMillis(), 0);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@@ -87,27 +94,22 @@ public class TestRMFailover {
setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
+
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
cluster = new MiniYARNCluster(TestRMFailover.class.getName(), 2, 1, 1, 1);
- cluster.init(conf);
- cluster.start();
-
- cluster.getResourceManager(0).getRMContext().getRMAdminService()
- .transitionToActive(req);
- assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
}
- @AfterClass
- public static void teardown() {
+ @After
+ public void teardown() {
cluster.stop();
}
private void verifyClientConnection() {
int numRetries = 3;
while(numRetries-- > 0) {
- Configuration conf = new YarnConfiguration(TestRMFailover.conf);
+ Configuration conf = new YarnConfiguration(this.conf);
YarnClient client = YarnClient.createYarnClient();
client.init(conf);
client.start();
@@ -123,31 +125,131 @@ public class TestRMFailover {
fail("Client couldn't connect to the Active RM");
}
+ private void verifyConnections() throws InterruptedException, YarnException {
+ assertTrue("NMs failed to connect to the RM",
+ cluster.waitForNodeManagersToConnect(20000));
+ verifyClientConnection();
+ }
+
+ private AdminService getAdminService(int index) {
+ return cluster.getResourceManager(index).getRMContext().getRMAdminService();
+ }
+
+ private void explicitFailover() throws IOException {
+ int activeRMIndex = cluster.getActiveRMIndex();
+ int newActiveRMIndex = (activeRMIndex + 1) % 2;
+ getAdminService(activeRMIndex).transitionToStandby(req);
+ getAdminService(newActiveRMIndex).transitionToActive(req);
+ assertEquals("Failover failed", newActiveRMIndex, cluster.getActiveRMIndex());
+ }
+
+ private void failover()
+ throws IOException, InterruptedException, YarnException {
+ int activeRMIndex = cluster.getActiveRMIndex();
+ cluster.stopResourceManager(activeRMIndex);
+ assertEquals("Failover failed",
+ (activeRMIndex + 1) % 2, cluster.getActiveRMIndex());
+ cluster.restartResourceManager(activeRMIndex);
+ }
+
@Test
public void testExplicitFailover()
throws YarnException, InterruptedException, IOException {
- assertTrue("NMs failed to connect to the RM",
- cluster.waitForNodeManagersToConnect(5000));
- verifyClientConnection();
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
- // Failover to the second RM
- getRMAdminService(0).transitionToStandby(req);
- getRMAdminService(1).transitionToActive(req);
- assertEquals("Wrong ResourceManager is active",
- HAServiceProtocol.HAServiceState.ACTIVE,
- getRMAdminService(1).getServiceStatus().getState());
- assertTrue("NMs failed to connect to the RM",
- cluster.waitForNodeManagersToConnect(5000));
- verifyClientConnection();
+ explicitFailover();
+ verifyConnections();
- // Failover back to the first RM
- getRMAdminService(1).transitionToStandby(req);
- getRMAdminService(0).transitionToActive(req);
- assertEquals("Wrong ResourceManager is active",
- HAServiceProtocol.HAServiceState.ACTIVE,
- getRMAdminService(0).getServiceStatus().getState());
- assertTrue("NMs failed to connect to the RM",
- cluster.waitForNodeManagersToConnect(5000));
- verifyClientConnection();
+ explicitFailover();
+ verifyConnections();
+ }
+
+ @Test
+ public void testAutomaticFailover()
+ throws YarnException, InterruptedException, IOException {
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true);
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster");
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+ conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000);
+
+ cluster.init(conf);
+ cluster.start();
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+
+ failover();
+ verifyConnections();
+
+ failover();
+ verifyConnections();
+ }
+
+ @Test
+ public void testWebAppProxyInStandAloneMode() throws YarnException,
+ InterruptedException, IOException {
+ WebAppProxyServer webAppProxyServer = new WebAppProxyServer();
+ try {
+ conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:9099");
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+ webAppProxyServer.init(conf);
+
+ // Start webAppProxyServer
+ Assert.assertEquals(STATE.INITED, webAppProxyServer.getServiceState());
+ webAppProxyServer.start();
+ Assert.assertEquals(STATE.STARTED, webAppProxyServer.getServiceState());
+
+ URL wrongUrl = new URL("http://0.0.0.0:9099/proxy/" + fakeAppId);
+ HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+ .openConnection();
+
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+
+ explicitFailover();
+ verifyConnections();
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+ } finally {
+ webAppProxyServer.stop();
+ }
+ }
+
+ @Test
+ public void testEmbeddedWebAppProxy() throws YarnException,
+ InterruptedException, IOException {
+ cluster.init(conf);
+ cluster.start();
+ getAdminService(0).transitionToActive(req);
+ assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
+ verifyConnections();
+ URL wrongUrl = new URL("http://0.0.0.0:18088/proxy/" + fakeAppId);
+ HttpURLConnection proxyConn = (HttpURLConnection) wrongUrl
+ .openConnection();
+
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+
+ explicitFailover();
+ verifyConnections();
+ proxyConn.connect();
+ verifyExpectedException(proxyConn.getResponseMessage());
+ }
+
+ private void verifyExpectedException(String exceptionMessage){
+ assertTrue(exceptionMessage.contains(ApplicationNotFoundException.class
+ .getName()));
+ assertTrue(exceptionMessage
+ .contains("Application with id '" + fakeAppId + "' " +
+ "doesn't exist in RM."));
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 966995c99ce..00ab7895d67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -378,6 +378,13 @@ public class TestYarnClient {
appId = createApp(rmClient, true);
waitTillAccepted(rmClient, appId);
+ long start = System.currentTimeMillis();
+ while (rmClient.getAMRMToken(appId) == null) {
+ if (System.currentTimeMillis() - start > 20 * 1000) {
+ Assert.fail("AMRM token is null");
+ }
+ Thread.sleep(100);
+ }
//unmanaged AMs do return AMRM token
Assert.assertNotNull(rmClient.getAMRMToken(appId));
@@ -392,6 +399,13 @@ public class TestYarnClient {
rmClient.start();
ApplicationId appId = createApp(rmClient, true);
waitTillAccepted(rmClient, appId);
+ long start = System.currentTimeMillis();
+ while (rmClient.getAMRMToken(appId) == null) {
+ if (System.currentTimeMillis() - start > 20 * 1000) {
+ Assert.fail("AMRM token is null");
+ }
+ Thread.sleep(100);
+ }
//unmanaged AMs do return AMRM token
Assert.assertNotNull(rmClient.getAMRMToken(appId));
return appId;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
similarity index 95%
rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
index 06bbc3555c4..91d0bf7fc92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
@@ -39,16 +39,13 @@ import com.google.common.base.Preconditions;
public class ClientRMProxy extends RMProxy {
private static final Log LOG = LogFactory.getLog(ClientRMProxy.class);
+ private static final ClientRMProxy INSTANCE = new ClientRMProxy();
private interface ClientRMProtocols extends ApplicationClientProtocol,
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
// Add nothing
}
- static {
- INSTANCE = new ClientRMProxy();
- }
-
private ClientRMProxy(){
super();
}
@@ -63,9 +60,7 @@ public class ClientRMProxy extends RMProxy {
*/
public static T createRMProxy(final Configuration configuration,
final Class protocol) throws IOException {
- // This method exists only to initiate this class' static INSTANCE. TODO:
- // FIX if possible
- return RMProxy.createRMProxy(configuration, protocol);
+ return createRMProxy(configuration, protocol, INSTANCE);
}
private static void setupTokens(InetSocketAddress resourceManagerAddress)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
index 74cb4992383..659be254162 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java
@@ -21,16 +21,19 @@ package org.apache.hadoop.yarn.client;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RMHAServiceTarget extends HAServiceTarget {
- private InetSocketAddress haAdminServiceAddress;
+ private final boolean autoFailoverEnabled;
+ private final InetSocketAddress haAdminServiceAddress;
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
+ autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -44,19 +47,23 @@ public class RMHAServiceTarget extends HAServiceTarget {
@Override
public InetSocketAddress getZKFCAddress() {
- // TODO (YARN-1177): Hook up ZKFC information
- return null;
+ // TODO (YARN-1177): ZKFC implementation
+ throw new UnsupportedOperationException("RMHAServiceTarget doesn't have " +
+ "a corresponding ZKFC address");
}
@Override
public NodeFencer getFencer() {
- // TODO (YARN-1026): Hook up fencing implementation
return null;
}
@Override
- public void checkFencingConfigured()
- throws BadFencingConfigurationException {
- // TODO (YARN-1026): Based on fencing implementation
+ public void checkFencingConfigured() throws BadFencingConfigurationException {
+ throw new BadFencingConfigurationException("Fencer not configured");
+ }
+
+ @Override
+ public boolean isAutoFailoverEnabled() {
+ return autoFailoverEnabled;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 913eb04613c..c15018bde8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -50,7 +50,6 @@ import com.google.common.annotations.VisibleForTesting;
public class RMProxy {
private static final Log LOG = LogFactory.getLog(RMProxy.class);
- protected static RMProxy INSTANCE;
protected RMProxy() {}
@@ -79,17 +78,17 @@ public class RMProxy