Merging r1526366 through r1526708 from trunk to branch HDFS-2832
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1526717 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
8062d8c239
|
@ -333,6 +333,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
HADOOP-9915. o.a.h.fs.Stat support on Mac OS X (Binglin Chang via Colin
|
HADOOP-9915. o.a.h.fs.Stat support on Mac OS X (Binglin Chang via Colin
|
||||||
Patrick McCabe)
|
Patrick McCabe)
|
||||||
|
|
||||||
|
HADOOP-9998. Provide methods to clear only part of the DNSToSwitchMapping.
|
||||||
|
(Junping Du via Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
|
||||||
|
@ -392,6 +395,12 @@ Release 2.1.2 - UNRELEASED
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
HADOOP-9948. Add a config value to CLITestHelper to skip tests on Windows.
|
||||||
|
(Chuan Liu via cnauroth)
|
||||||
|
|
||||||
|
HADOOP-9976. Different versions of avro and avro-maven-plugin (Karthik
|
||||||
|
Kambatla via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -154,4 +154,11 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
|
||||||
public void reloadCachedMappings() {
|
public void reloadCachedMappings() {
|
||||||
cache.clear();
|
cache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
for (String name : names) {
|
||||||
|
cache.remove(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,4 +59,12 @@ public interface DNSToSwitchMapping {
|
||||||
* will get a chance to see the new data.
|
* will get a chance to see the new data.
|
||||||
*/
|
*/
|
||||||
public void reloadCachedMappings();
|
public void reloadCachedMappings();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reload cached mappings on specific nodes.
|
||||||
|
*
|
||||||
|
* If there is a cache on these nodes, this method will clear it, so that
|
||||||
|
* future accesses will see updated data.
|
||||||
|
*/
|
||||||
|
public void reloadCachedMappings(List<String> names);
|
||||||
}
|
}
|
||||||
|
|
|
@ -269,5 +269,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
|
||||||
// Nothing to do here, since RawScriptBasedMapping has no cache, and
|
// Nothing to do here, since RawScriptBasedMapping has no cache, and
|
||||||
// does not inherit from CachedDNSToSwitchMapping
|
// does not inherit from CachedDNSToSwitchMapping
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
// Nothing to do here, since RawScriptBasedMapping has no cache, and
|
||||||
|
// does not inherit from CachedDNSToSwitchMapping
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,5 +162,12 @@ public class TableMapping extends CachedDNSToSwitchMapping {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
// TableMapping has to reload all mappings at once, so no chance to
|
||||||
|
// reload mappings on specific nodes
|
||||||
|
reloadCachedMappings();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,10 @@ package org.apache.hadoop.cli;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.cli.util.*;
|
import org.apache.hadoop.cli.util.*;
|
||||||
import org.apache.hadoop.cli.util.CLITestCmd;
|
|
||||||
import org.apache.hadoop.cli.util.CLICommand;
|
|
||||||
import org.apache.hadoop.cli.util.CommandExecutor.Result;
|
import org.apache.hadoop.cli.util.CommandExecutor.Result;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
@ -369,6 +368,7 @@ public class CLITestHelper {
|
||||||
CLITestData td = null;
|
CLITestData td = null;
|
||||||
ArrayList<CLICommand> testCommands = null;
|
ArrayList<CLICommand> testCommands = null;
|
||||||
ArrayList<CLICommand> cleanupCommands = null;
|
ArrayList<CLICommand> cleanupCommands = null;
|
||||||
|
boolean runOnWindows = true;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startDocument() throws SAXException {
|
public void startDocument() throws SAXException {
|
||||||
|
@ -399,6 +399,8 @@ public class CLITestHelper {
|
||||||
throws SAXException {
|
throws SAXException {
|
||||||
if (qName.equals("description")) {
|
if (qName.equals("description")) {
|
||||||
td.setTestDesc(charString);
|
td.setTestDesc(charString);
|
||||||
|
} else if (qName.equals("windows")) {
|
||||||
|
runOnWindows = Boolean.parseBoolean(charString);
|
||||||
} else if (qName.equals("test-commands")) {
|
} else if (qName.equals("test-commands")) {
|
||||||
td.setTestCommands(testCommands);
|
td.setTestCommands(testCommands);
|
||||||
testCommands = null;
|
testCommands = null;
|
||||||
|
@ -420,8 +422,11 @@ public class CLITestHelper {
|
||||||
} else if (qName.equals("expected-output")) {
|
} else if (qName.equals("expected-output")) {
|
||||||
comparatorData.setExpectedOutput(charString);
|
comparatorData.setExpectedOutput(charString);
|
||||||
} else if (qName.equals("test")) {
|
} else if (qName.equals("test")) {
|
||||||
testsFromConfigFile.add(td);
|
if (!Shell.WINDOWS || runOnWindows) {
|
||||||
|
testsFromConfigFile.add(td);
|
||||||
|
}
|
||||||
td = null;
|
td = null;
|
||||||
|
runOnWindows = true;
|
||||||
} else if (qName.equals("mode")) {
|
} else if (qName.equals("mode")) {
|
||||||
testMode = charString;
|
testMode = charString;
|
||||||
if (!testMode.equals(TESTMODE_NOCOMPARE) &&
|
if (!testMode.equals(TESTMODE_NOCOMPARE) &&
|
||||||
|
|
|
@ -152,4 +152,10 @@ public class StaticMapping extends AbstractDNSToSwitchMapping {
|
||||||
// reloadCachedMappings does nothing for StaticMapping; there is
|
// reloadCachedMappings does nothing for StaticMapping; there is
|
||||||
// nowhere to reload from since all data is in memory.
|
// nowhere to reload from since all data is in memory.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
// reloadCachedMappings does nothing for StaticMapping; there is
|
||||||
|
// nowhere to reload from since all data is in memory.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,5 +120,9 @@ public class TestSwitchMapping extends Assert {
|
||||||
@Override
|
@Override
|
||||||
public void reloadCachedMappings() {
|
public void reloadCachedMappings() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,9 @@ Trunk (Unreleased)
|
||||||
|
|
||||||
HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth)
|
HDFS-4904. Remove JournalService. (Arpit Agarwal via cnauroth)
|
||||||
|
|
||||||
|
HDFS-5041. Add the time of last heartbeat to dead server Web UI (Shinichi
|
||||||
|
Yamashita via brandonli)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -345,6 +348,12 @@ Release 2.1.2 - UNRELEASED
|
||||||
HDFS-5251. Race between the initialization of NameNode and the http
|
HDFS-5251. Race between the initialization of NameNode and the http
|
||||||
server. (Haohui Mai via suresh)
|
server. (Haohui Mai via suresh)
|
||||||
|
|
||||||
|
HDFS-5258. Skip tests in TestHDFSCLI that are not applicable on Windows.
|
||||||
|
(Chuan Liu via cnauroth)
|
||||||
|
|
||||||
|
HDFS-5186. TestFileJournalManager fails on Windows due to file handle leaks.
|
||||||
|
(Chuan Liu via cnauroth)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -886,7 +886,12 @@ public class DatanodeManager {
|
||||||
// If the network location is invalid, clear the cached mappings
|
// If the network location is invalid, clear the cached mappings
|
||||||
// so that we have a chance to re-add this DataNode with the
|
// so that we have a chance to re-add this DataNode with the
|
||||||
// correct network location later.
|
// correct network location later.
|
||||||
dnsToSwitchMapping.reloadCachedMappings();
|
List<String> invalidNodeNames = new ArrayList<String>(3);
|
||||||
|
// clear cache for nodes in IP or Hostname
|
||||||
|
invalidNodeNames.add(nodeReg.getIpAddr());
|
||||||
|
invalidNodeNames.add(nodeReg.getHostName());
|
||||||
|
invalidNodeNames.add(nodeReg.getPeerHostName());
|
||||||
|
dnsToSwitchMapping.reloadCachedMappings(invalidNodeNames);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.net.URLEncoder;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Date;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -786,9 +787,13 @@ class NamenodeJspHelper {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
|
generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
|
||||||
|
long currentTime = Time.now();
|
||||||
|
long timestamp = d.getLastUpdate();
|
||||||
if (!alive) {
|
if (!alive) {
|
||||||
out.print("<td class=\"decommissioned\"> " +
|
out.print("<td class=\"lastcontact\"> "
|
||||||
d.isDecommissioned() + "\n");
|
+ new Date(timestamp)
|
||||||
|
+ "<td class=\"decommissioned\"> "
|
||||||
|
+ d.isDecommissioned() + "\n");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -801,9 +806,6 @@ class NamenodeJspHelper {
|
||||||
String percentRemaining = fraction2String(d.getRemainingPercent());
|
String percentRemaining = fraction2String(d.getRemainingPercent());
|
||||||
|
|
||||||
String adminState = d.getAdminState().toString();
|
String adminState = d.getAdminState().toString();
|
||||||
|
|
||||||
long timestamp = d.getLastUpdate();
|
|
||||||
long currentTime = Time.now();
|
|
||||||
|
|
||||||
long bpUsed = d.getBlockPoolUsed();
|
long bpUsed = d.getBlockPoolUsed();
|
||||||
String percentBpUsed = fraction2String(d.getBlockPoolUsedPercent());
|
String percentBpUsed = fraction2String(d.getBlockPoolUsedPercent());
|
||||||
|
@ -954,6 +956,8 @@ class NamenodeJspHelper {
|
||||||
+ "<th " + nodeHeaderStr("node")
|
+ "<th " + nodeHeaderStr("node")
|
||||||
+ "> Node <th " + nodeHeaderStr("address")
|
+ "> Node <th " + nodeHeaderStr("address")
|
||||||
+ "> Transferring<br>Address <th "
|
+ "> Transferring<br>Address <th "
|
||||||
|
+ nodeHeaderStr("lastcontact")
|
||||||
|
+ "> Last <br>Contact <th "
|
||||||
+ nodeHeaderStr("decommissioned")
|
+ nodeHeaderStr("decommissioned")
|
||||||
+ "> Decommissioned\n");
|
+ "> Decommissioned\n");
|
||||||
|
|
||||||
|
|
|
@ -86,24 +86,27 @@ public class TestFileJournalManager {
|
||||||
EditLogInputStream elis = null;
|
EditLogInputStream elis = null;
|
||||||
try {
|
try {
|
||||||
while ((elis = allStreams.poll()) != null) {
|
while ((elis = allStreams.poll()) != null) {
|
||||||
elis.skipUntil(txId);
|
try {
|
||||||
while (true) {
|
elis.skipUntil(txId);
|
||||||
FSEditLogOp op = elis.readOp();
|
while (true) {
|
||||||
if (op == null) {
|
FSEditLogOp op = elis.readOp();
|
||||||
break;
|
if (op == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (abortOnGap && (op.getTransactionId() != txId)) {
|
||||||
|
LOG.info("getNumberOfTransactions: detected gap at txId "
|
||||||
|
+ fromTxId);
|
||||||
|
return numTransactions;
|
||||||
|
}
|
||||||
|
txId = op.getTransactionId() + 1;
|
||||||
|
numTransactions++;
|
||||||
}
|
}
|
||||||
if (abortOnGap && (op.getTransactionId() != txId)) {
|
} finally {
|
||||||
LOG.info("getNumberOfTransactions: detected gap at txId " +
|
IOUtils.cleanup(LOG, elis);
|
||||||
fromTxId);
|
|
||||||
return numTransactions;
|
|
||||||
}
|
|
||||||
txId = op.getTransactionId() + 1;
|
|
||||||
numTransactions++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
|
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
|
||||||
IOUtils.cleanup(LOG, elis);
|
|
||||||
}
|
}
|
||||||
return numTransactions;
|
return numTransactions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1043,6 +1043,7 @@
|
||||||
|
|
||||||
<test> <!-- TESTED -->
|
<test> <!-- TESTED -->
|
||||||
<description>ls: Negative test for quoted /*/* globbing </description>
|
<description>ls: Negative test for quoted /*/* globbing </description>
|
||||||
|
<windows>false</windows>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
<command>-fs NAMENODE -mkdir /dir0</command>
|
<command>-fs NAMENODE -mkdir /dir0</command>
|
||||||
<command>-fs NAMENODE -mkdir /dir0/dir1</command>
|
<command>-fs NAMENODE -mkdir /dir0/dir1</command>
|
||||||
|
@ -1062,6 +1063,7 @@
|
||||||
|
|
||||||
<test> <!-- TESTED -->
|
<test> <!-- TESTED -->
|
||||||
<description>ls: Test for quoted globbing </description>
|
<description>ls: Test for quoted globbing </description>
|
||||||
|
<windows>false</windows>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
<command>-fs NAMENODE -mkdir /dir0</command>
|
<command>-fs NAMENODE -mkdir /dir0</command>
|
||||||
<command>-fs NAMENODE -mkdir /dir0/\*</command>
|
<command>-fs NAMENODE -mkdir /dir0/\*</command>
|
||||||
|
@ -1082,6 +1084,7 @@
|
||||||
|
|
||||||
<test> <!-- TESTED -->
|
<test> <!-- TESTED -->
|
||||||
<description>rm: Test for quoted globbing </description>
|
<description>rm: Test for quoted globbing </description>
|
||||||
|
<windows>false</windows>
|
||||||
<test-commands>
|
<test-commands>
|
||||||
<command>-fs NAMENODE -mkdir /dir0</command>
|
<command>-fs NAMENODE -mkdir /dir0</command>
|
||||||
<command>-fs NAMENODE -mkdir /dir0/\*</command>
|
<command>-fs NAMENODE -mkdir /dir0/\*</command>
|
||||||
|
|
|
@ -221,6 +221,15 @@ Release 2.1.2 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-5503. Fixed a test issue in TestMRJobClient. (Jian He via vinodkv)
|
MAPREDUCE-5503. Fixed a test issue in TestMRJobClient. (Jian He via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5170. Fixed a wrong log message in CombineFileInputFormat class.
|
||||||
|
(Sangjin Lee via vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-5525. Increase timeout of TestDFSIO.testAppend and
|
||||||
|
TestMRJobsWithHistoryService.testJobHistoryData. (Chuan Liu via cnauroth)
|
||||||
|
|
||||||
|
MAPREDUCE-5513. ConcurrentModificationException in JobControl (Robert
|
||||||
|
Parker via jlowe)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1403,6 +1412,9 @@ Release 0.23.10 - UNRELEASED
|
||||||
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
|
MAPREDUCE-5504. mapred queue -info inconsistent with types (Kousuke Saruta
|
||||||
via tgraves)
|
via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-5513. ConcurrentModificationException in JobControl (Robert
|
||||||
|
Parker via jlowe)
|
||||||
|
|
||||||
Release 0.23.9 - 2013-07-08
|
Release 0.23.9 - 2013-07-08
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -203,13 +203,13 @@ public abstract class CombineFileInputFormat<K, V>
|
||||||
maxSize);
|
maxSize);
|
||||||
}
|
}
|
||||||
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
|
if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
|
||||||
throw new IOException("Minimum split size per rack" + minSizeRack +
|
throw new IOException("Minimum split size per rack " + minSizeRack +
|
||||||
" cannot be larger than maximum split size " +
|
" cannot be larger than maximum split size " +
|
||||||
maxSize);
|
maxSize);
|
||||||
}
|
}
|
||||||
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
|
if (minSizeRack != 0 && minSizeNode > minSizeRack) {
|
||||||
throw new IOException("Minimum split size per node" + minSizeNode +
|
throw new IOException("Minimum split size per node " + minSizeNode +
|
||||||
" cannot be smaller than minimum split " +
|
" cannot be larger than minimum split " +
|
||||||
"size per rack " + minSizeRack);
|
"size per rack " + minSizeRack);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,13 +79,11 @@ public class JobControl implements Runnable {
|
||||||
this.runnerState = ThreadState.READY;
|
this.runnerState = ThreadState.READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<ControlledJob> toList(
|
synchronized private static List<ControlledJob> toList(
|
||||||
LinkedList<ControlledJob> jobs) {
|
LinkedList<ControlledJob> jobs) {
|
||||||
ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
|
ArrayList<ControlledJob> retv = new ArrayList<ControlledJob>();
|
||||||
synchronized (jobs) {
|
for (ControlledJob job : jobs) {
|
||||||
for (ControlledJob job : jobs) {
|
retv.add(job);
|
||||||
retv.add(job);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return retv;
|
return retv;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,10 @@ public class TestJobHistoryParsing {
|
||||||
@Override
|
@Override
|
||||||
public void reloadCachedMappings() {
|
public void reloadCachedMappings() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 50000)
|
@Test(timeout = 50000)
|
||||||
|
|
|
@ -269,7 +269,7 @@ public class TestDFSIO implements Tool {
|
||||||
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
|
bench.analyzeResult(fs, TestType.TEST_TYPE_READ_SKIP, execTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 3000)
|
@Test (timeout = 6000)
|
||||||
public void testAppend() throws Exception {
|
public void testAppend() throws Exception {
|
||||||
FileSystem fs = cluster.getFileSystem();
|
FileSystem fs = cluster.getFileSystem();
|
||||||
long tStart = System.currentTimeMillis();
|
long tStart = System.currentTimeMillis();
|
||||||
|
|
|
@ -111,7 +111,7 @@ public class TestMRJobsWithHistoryService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 30000)
|
@Test (timeout = 90000)
|
||||||
public void testJobHistoryData() throws IOException, InterruptedException,
|
public void testJobHistoryData() throws IOException, InterruptedException,
|
||||||
AvroRemoteException, ClassNotFoundException {
|
AvroRemoteException, ClassNotFoundException {
|
||||||
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
|
||||||
|
|
|
@ -59,6 +59,9 @@
|
||||||
<hadoop.common.build.dir>${basedir}/../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
|
<hadoop.common.build.dir>${basedir}/../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
|
||||||
<java.security.egd>file:///dev/urandom</java.security.egd>
|
<java.security.egd>file:///dev/urandom</java.security.egd>
|
||||||
|
|
||||||
|
<!-- avro version -->
|
||||||
|
<avro.version>1.7.4</avro.version>
|
||||||
|
|
||||||
<!-- jersey version -->
|
<!-- jersey version -->
|
||||||
<jersey.version>1.9</jersey.version>
|
<jersey.version>1.9</jersey.version>
|
||||||
|
|
||||||
|
@ -630,7 +633,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.avro</groupId>
|
<groupId>org.apache.avro</groupId>
|
||||||
<artifactId>avro</artifactId>
|
<artifactId>avro</artifactId>
|
||||||
<version>1.7.4</version>
|
<version>${avro.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>net.sf.kosmosfs</groupId>
|
<groupId>net.sf.kosmosfs</groupId>
|
||||||
|
@ -798,7 +801,7 @@
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.avro</groupId>
|
<groupId>org.apache.avro</groupId>
|
||||||
<artifactId>avro-maven-plugin</artifactId>
|
<artifactId>avro-maven-plugin</artifactId>
|
||||||
<version>1.5.3</version>
|
<version>${avro.version}</version>
|
||||||
</plugin>
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.codehaus.mojo.jspc</groupId>
|
<groupId>org.codehaus.mojo.jspc</groupId>
|
||||||
|
|
|
@ -36,6 +36,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
|
YARN-353. Add Zookeeper-based store implementation for RMStateStore.
|
||||||
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
|
(Bikas Saha, Jian He and Karthik Kambatla via hitesh)
|
||||||
|
|
||||||
|
YARN-819. ResourceManager and NodeManager should check for a minimum allowed
|
||||||
|
version (Robert Parker via jeagles)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -83,6 +86,9 @@ Release 2.1.2 - UNRELEASED
|
||||||
YARN-49. Improve distributed shell application to work on a secure cluster.
|
YARN-49. Improve distributed shell application to work on a secure cluster.
|
||||||
(Vinod Kumar Vavilapalli via hitesh)
|
(Vinod Kumar Vavilapalli via hitesh)
|
||||||
|
|
||||||
|
YARN-1157. Fixed ResourceManager UI to behave correctly when apps like
|
||||||
|
distributed-shell do not set tracking urls. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -100,11 +100,22 @@ public abstract class FinishApplicationMasterRequest {
|
||||||
public abstract String getTrackingUrl();
|
public abstract String getTrackingUrl();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the <em>tracking URL</em>for the <code>ApplicationMaster</code>
|
* Set the <em>final tracking URL</em>for the <code>ApplicationMaster</code>.
|
||||||
* This url if contains scheme then that will be used by resource manager
|
* This is the web-URL to which ResourceManager or web-application proxy will
|
||||||
* web application proxy otherwise it will default to http.
|
* redirect client/users once the application is finished and the
|
||||||
* @param url <em>tracking URL</em>for the
|
* <code>ApplicationMaster</code> is gone.
|
||||||
* <code>ApplicationMaster</code>
|
* <p>
|
||||||
|
* If the passed url has a scheme then that will be used by the
|
||||||
|
* ResourceManager and web-application proxy, otherwise the scheme will
|
||||||
|
* default to http.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Empty, null, "N/A" strings are all valid besides a real URL. In case an url
|
||||||
|
* isn't explicitly passed, it defaults to "N/A" on the ResourceManager.
|
||||||
|
* <p>
|
||||||
|
*
|
||||||
|
* @param url
|
||||||
|
* <em>tracking URL</em>for the <code>ApplicationMaster</code>
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
|
|
|
@ -112,11 +112,22 @@ public abstract class RegisterApplicationMasterRequest {
|
||||||
public abstract String getTrackingUrl();
|
public abstract String getTrackingUrl();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the <em>tracking URL</em> for the <code>ApplicationMaster</code>.
|
* Set the <em>tracking URL</em>for the <code>ApplicationMaster</code> while
|
||||||
* This url if contains scheme then that will be used by resource manager
|
* it is running. This is the web-URL to which ResourceManager or
|
||||||
* web application proxy otherwise it will default to http.
|
* web-application proxy will redirect client/users while the application and
|
||||||
* @param trackingUrl <em>tracking URL</em> for the
|
* the <code>ApplicationMaster</code> are still running.
|
||||||
* <code>ApplicationMaster</code>
|
* <p>
|
||||||
|
* If the passed url has a scheme then that will be used by the
|
||||||
|
* ResourceManager and web-application proxy, otherwise the scheme will
|
||||||
|
* default to http.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Empty, null, "N/A" strings are all valid besides a real URL. In case an url
|
||||||
|
* isn't explicitly passed, it defaults to "N/A" on the ResourceManager.
|
||||||
|
* <p>
|
||||||
|
*
|
||||||
|
* @param trackingUrl
|
||||||
|
* <em>tracking URL</em>for the <code>ApplicationMaster</code>
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Stable
|
@Stable
|
||||||
|
|
|
@ -362,6 +362,13 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
public static final long DEFAULT_RM_NMTOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS =
|
||||||
24 * 60 * 60;
|
24 * 60 * 60;
|
||||||
|
|
||||||
|
public static final String RM_NODEMANAGER_MINIMUM_VERSION =
|
||||||
|
RM_PREFIX + "nodemanager.minimum.version";
|
||||||
|
|
||||||
|
public static final String DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION =
|
||||||
|
"NONE";
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Node Manager Configs
|
// Node Manager Configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
|
@ -460,6 +467,10 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
|
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
|
||||||
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
|
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
|
||||||
|
|
||||||
|
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
|
||||||
|
NM_PREFIX + "resourcemanager.minimum.version";
|
||||||
|
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
|
||||||
|
|
||||||
/** Interval at which the delayed token removal thread runs */
|
/** Interval at which the delayed token removal thread runs */
|
||||||
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
||||||
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
||||||
|
|
|
@ -215,6 +215,10 @@ public class TestAMRMClientContainerRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reloadCachedMappings() {}
|
public void reloadCachedMappings() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyResourceRequest(
|
private void verifyResourceRequest(
|
||||||
|
|
|
@ -358,6 +358,14 @@
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The minimum allowed version of a connecting nodemanager. The valid values are
|
||||||
|
NONE (no version checking), EqualToRM (the nodemanager's version is equal to
|
||||||
|
or greater than the RM version), or a Version String.</description>
|
||||||
|
<name>yarn.resourcemanager.nodemanager.minimum.version</name>
|
||||||
|
<value>NONE</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Enable a set of periodic monitors (specified in
|
<description>Enable a set of periodic monitors (specified in
|
||||||
yarn.resourcemanager.scheduler.monitor.policies) that affect the
|
yarn.resourcemanager.scheduler.monitor.policies) that affect the
|
||||||
|
@ -737,6 +745,14 @@
|
||||||
<value>30</value>
|
<value>30</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>The minimum allowed version of a resourcemanager that a nodemanager will connect to.
|
||||||
|
The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is
|
||||||
|
equal to or greater than the NM version), or a Version String.</description>
|
||||||
|
<name>yarn.nodemanager.resourcemanager.minimum.version</name>
|
||||||
|
<value>NONE</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>Max number of threads in NMClientAsync to process container
|
<description>Max number of threads in NMClientAsync to process container
|
||||||
management events</description>
|
management events</description>
|
||||||
|
|
|
@ -67,6 +67,10 @@ public class TestRackResolver {
|
||||||
public void reloadCachedMappings() {
|
public void reloadCachedMappings() {
|
||||||
// nothing to do here, since RawScriptBasedMapping has no cache.
|
// nothing to do here, since RawScriptBasedMapping has no cache.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reloadCachedMappings(List<String> names) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -25,8 +25,10 @@ public interface RegisterNodeManagerRequest {
|
||||||
NodeId getNodeId();
|
NodeId getNodeId();
|
||||||
int getHttpPort();
|
int getHttpPort();
|
||||||
Resource getResource();
|
Resource getResource();
|
||||||
|
String getNMVersion();
|
||||||
|
|
||||||
void setNodeId(NodeId nodeId);
|
void setNodeId(NodeId nodeId);
|
||||||
void setHttpPort(int port);
|
void setHttpPort(int port);
|
||||||
void setResource(Resource resource);
|
void setResource(Resource resource);
|
||||||
|
void setNMVersion(String version);
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,4 +42,7 @@ public interface RegisterNodeManagerResponse {
|
||||||
|
|
||||||
void setDiagnosticsMessage(String diagnosticsMessage);
|
void setDiagnosticsMessage(String diagnosticsMessage);
|
||||||
|
|
||||||
|
void setRMVersion(String version);
|
||||||
|
|
||||||
|
String getRMVersion();
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,6 +139,21 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
|
||||||
builder.setHttpPort(httpPort);
|
builder.setHttpPort(httpPort);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getNMVersion() {
|
||||||
|
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasNmVersion()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return (p.getNmVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNMVersion(String version) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.setNmVersion(version);
|
||||||
|
}
|
||||||
|
|
||||||
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
|
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
|
||||||
return new NodeIdPBImpl(p);
|
return new NodeIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -150,6 +150,25 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
builder.setDiagnosticsMessage((diagnosticsMessage));
|
builder.setDiagnosticsMessage((diagnosticsMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getRMVersion() {
|
||||||
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasRmVersion()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return p.getRmVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setRMVersion(String rmVersion) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (rmVersion == null) {
|
||||||
|
builder.clearRmIdentifier();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setRmVersion(rmVersion);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NodeAction getNodeAction() {
|
public NodeAction getNodeAction() {
|
||||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
|
|
@ -29,6 +29,7 @@ message RegisterNodeManagerRequestProto {
|
||||||
optional NodeIdProto node_id = 1;
|
optional NodeIdProto node_id = 1;
|
||||||
optional int32 http_port = 3;
|
optional int32 http_port = 3;
|
||||||
optional ResourceProto resource = 4;
|
optional ResourceProto resource = 4;
|
||||||
|
optional string nm_version = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
|
@ -37,6 +38,7 @@ message RegisterNodeManagerResponseProto {
|
||||||
optional NodeActionProto nodeAction = 3;
|
optional NodeActionProto nodeAction = 3;
|
||||||
optional int64 rm_identifier = 4;
|
optional int64 rm_identifier = 4;
|
||||||
optional string diagnostics_message = 5;
|
optional string diagnostics_message = 5;
|
||||||
|
optional string rm_version = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatRequestProto {
|
message NodeHeartbeatRequestProto {
|
||||||
|
@ -45,7 +47,6 @@ message NodeHeartbeatRequestProto {
|
||||||
optional MasterKeyProto last_known_nm_token_master_key = 3;
|
optional MasterKeyProto last_known_nm_token_master_key = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
message NodeHeartbeatResponseProto {
|
message NodeHeartbeatResponseProto {
|
||||||
optional int32 response_id = 1;
|
optional int32 response_id = 1;
|
||||||
optional MasterKeyProto container_token_master_key = 2;
|
optional MasterKeyProto container_token_master_key = 2;
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -84,6 +86,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private ResourceTracker resourceTracker;
|
private ResourceTracker resourceTracker;
|
||||||
private Resource totalResource;
|
private Resource totalResource;
|
||||||
private int httpPort;
|
private int httpPort;
|
||||||
|
private String nodeManagerVersionId;
|
||||||
|
private String minimumResourceManagerVersion;
|
||||||
private volatile boolean isStopped;
|
private volatile boolean isStopped;
|
||||||
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
private boolean tokenKeepAliveEnabled;
|
private boolean tokenKeepAliveEnabled;
|
||||||
|
@ -138,6 +142,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
this.tokenRemovalDelayMs =
|
this.tokenRemovalDelayMs =
|
||||||
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
|
||||||
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
|
||||||
|
|
||||||
|
this.minimumResourceManagerVersion = conf.get(
|
||||||
|
YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION,
|
||||||
|
YarnConfiguration.DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION);
|
||||||
|
|
||||||
// Default duration to track stopped containers on nodemanager is 10Min.
|
// Default duration to track stopped containers on nodemanager is 10Min.
|
||||||
// This should not be assigned very large value as it will remember all the
|
// This should not be assigned very large value as it will remember all the
|
||||||
|
@ -168,6 +176,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
// NodeManager is the last service to start, so NodeId is available.
|
// NodeManager is the last service to start, so NodeId is available.
|
||||||
this.nodeId = this.context.getNodeId();
|
this.nodeId = this.context.getNodeId();
|
||||||
this.httpPort = this.context.getHttpPort();
|
this.httpPort = this.context.getHttpPort();
|
||||||
|
this.nodeManagerVersionId = YarnVersionInfo.getVersion();
|
||||||
try {
|
try {
|
||||||
// Registration has to be in start so that ContainerManager can get the
|
// Registration has to be in start so that ContainerManager can get the
|
||||||
// perNM tokens needed to authenticate ContainerTokens.
|
// perNM tokens needed to authenticate ContainerTokens.
|
||||||
|
@ -235,6 +244,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
request.setHttpPort(this.httpPort);
|
request.setHttpPort(this.httpPort);
|
||||||
request.setResource(this.totalResource);
|
request.setResource(this.totalResource);
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
|
request.setNMVersion(this.nodeManagerVersionId);
|
||||||
RegisterNodeManagerResponse regNMResponse =
|
RegisterNodeManagerResponse regNMResponse =
|
||||||
resourceTracker.registerNodeManager(request);
|
resourceTracker.registerNodeManager(request);
|
||||||
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
||||||
|
@ -248,6 +258,26 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
+ message);
|
+ message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if ResourceManager version is too old then shutdown
|
||||||
|
if (!minimumResourceManagerVersion.equals("NONE")){
|
||||||
|
if (minimumResourceManagerVersion.equals("EqualToNM")){
|
||||||
|
minimumResourceManagerVersion = nodeManagerVersionId;
|
||||||
|
}
|
||||||
|
String rmVersion = regNMResponse.getRMVersion();
|
||||||
|
if (rmVersion == null) {
|
||||||
|
String message = "The Resource Manager's did not return a version. "
|
||||||
|
+ "Valid version cannot be checked.";
|
||||||
|
throw new YarnRuntimeException("Shutting down the Node Manager. "
|
||||||
|
+ message);
|
||||||
|
}
|
||||||
|
if (VersionUtil.compareVersions(rmVersion,minimumResourceManagerVersion) < 0) {
|
||||||
|
String message = "The Resource Manager's version ("
|
||||||
|
+ rmVersion +") is less than the minimum "
|
||||||
|
+ "allowed version " + minimumResourceManagerVersion;
|
||||||
|
throw new YarnRuntimeException("Shutting down the Node Manager on RM "
|
||||||
|
+ "version error, " + message);
|
||||||
|
}
|
||||||
|
}
|
||||||
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
|
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
|
||||||
// do this now so that its set before we start heartbeating to RM
|
// do this now so that its set before we start heartbeating to RM
|
||||||
// It is expected that status updater is started by this point and
|
// It is expected that status updater is started by this point and
|
||||||
|
|
|
@ -145,7 +145,7 @@ public class TestNodeStatusUpdater {
|
||||||
.byteValue() }));
|
.byteValue() }));
|
||||||
return masterKey;
|
return masterKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MyResourceTracker implements ResourceTracker {
|
private class MyResourceTracker implements ResourceTracker {
|
||||||
|
|
||||||
private final Context context;
|
private final Context context;
|
||||||
|
@ -471,6 +471,7 @@ public class TestNodeStatusUpdater {
|
||||||
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
||||||
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
||||||
public String shutDownMessage = "";
|
public String shutDownMessage = "";
|
||||||
|
public String rmVersion = "3.0.1";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegisterNodeManagerResponse registerNodeManager(
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
|
@ -483,6 +484,7 @@ public class TestNodeStatusUpdater {
|
||||||
response.setContainerTokenMasterKey(createMasterKey());
|
response.setContainerTokenMasterKey(createMasterKey());
|
||||||
response.setNMTokenMasterKey(createMasterKey());
|
response.setNMTokenMasterKey(createMasterKey());
|
||||||
response.setDiagnosticsMessage(shutDownMessage);
|
response.setDiagnosticsMessage(shutDownMessage);
|
||||||
|
response.setRMVersion(rmVersion);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
@ -1180,6 +1182,44 @@ public class TestNodeStatusUpdater {
|
||||||
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
|
" connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRMVersionLessThanMinimum() throws InterruptedException {
|
||||||
|
final AtomicInteger numCleanups = new AtomicInteger(0);
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0");
|
||||||
|
nm = new NodeManager() {
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
|
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
|
||||||
|
context, dispatcher, healthChecker, metrics);
|
||||||
|
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
||||||
|
myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL;
|
||||||
|
myResourceTracker2.rmVersion = "3.0.0";
|
||||||
|
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
|
return myNodeStatusUpdater;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void cleanupContainers(NodeManagerEventType eventType) {
|
||||||
|
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
|
||||||
|
numCleanups.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
nm.init(conf);
|
||||||
|
nm.start();
|
||||||
|
|
||||||
|
// NM takes a while to reach the STARTED state.
|
||||||
|
int waitCount = 0;
|
||||||
|
while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to stop..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertTrue(nm.getServiceState() == STATE.STARTED);
|
||||||
|
nm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
private class MyNMContext extends NMContext {
|
private class MyNMContext extends NMContext {
|
||||||
ConcurrentMap<ContainerId, Container> containers =
|
ConcurrentMap<ContainerId, Container> containers =
|
||||||
new ConcurrentSkipListMap<ContainerId, Container>();
|
new ConcurrentSkipListMap<ContainerId, Container>();
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.RackResolver;
|
import org.apache.hadoop.yarn.util.RackResolver;
|
||||||
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
|
|
||||||
public class ResourceTrackerService extends AbstractService implements
|
public class ResourceTrackerService extends AbstractService implements
|
||||||
ResourceTracker {
|
ResourceTracker {
|
||||||
|
@ -73,6 +75,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
private long nextHeartBeatInterval;
|
private long nextHeartBeatInterval;
|
||||||
private Server server;
|
private Server server;
|
||||||
private InetSocketAddress resourceTrackerAddress;
|
private InetSocketAddress resourceTrackerAddress;
|
||||||
|
private String minimumNodeManagerVersion;
|
||||||
|
|
||||||
private static final NodeHeartbeatResponse resync = recordFactory
|
private static final NodeHeartbeatResponse resync = recordFactory
|
||||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
@ -99,6 +102,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
||||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -124,7 +128,11 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
minAllocVcores = conf.getInt(
|
minAllocVcores = conf.getInt(
|
||||||
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
|
||||||
|
|
||||||
|
minimumNodeManagerVersion = conf.get(
|
||||||
|
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
||||||
|
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,10 +180,30 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
int cmPort = nodeId.getPort();
|
int cmPort = nodeId.getPort();
|
||||||
int httpPort = request.getHttpPort();
|
int httpPort = request.getHttpPort();
|
||||||
Resource capability = request.getResource();
|
Resource capability = request.getResource();
|
||||||
|
String nodeManagerVersion = request.getNMVersion();
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
|
|
||||||
|
if (!minimumNodeManagerVersion.equals("NONE")) {
|
||||||
|
if (minimumNodeManagerVersion.equals("EqualToRM")) {
|
||||||
|
minimumNodeManagerVersion = YarnVersionInfo.getVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((nodeManagerVersion == null) ||
|
||||||
|
(VersionUtil.compareVersions(nodeManagerVersion,minimumNodeManagerVersion)) < 0) {
|
||||||
|
String message =
|
||||||
|
"Disallowed NodeManager Version " + nodeManagerVersion
|
||||||
|
+ ", is less than the minimum version "
|
||||||
|
+ minimumNodeManagerVersion + " sending SHUTDOWN signal to "
|
||||||
|
+ "NodeManager.";
|
||||||
|
LOG.info(message);
|
||||||
|
response.setDiagnosticsMessage(message);
|
||||||
|
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Check if this node is a 'valid' node
|
// Check if this node is a 'valid' node
|
||||||
if (!this.nodesListManager.isValidNode(host)) {
|
if (!this.nodesListManager.isValidNode(host)) {
|
||||||
String message =
|
String message =
|
||||||
|
@ -230,6 +258,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
LOG.info(message);
|
LOG.info(message);
|
||||||
response.setNodeAction(NodeAction.NORMAL);
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
|
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
|
||||||
|
response.setRMVersion(YarnVersionInfo.getVersion());
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -994,7 +994,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static final class AMRegisteredTransition extends BaseTransition {
|
private static final class AMRegisteredTransition extends BaseTransition {
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMAppAttemptImpl appAttempt,
|
public void transition(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptEvent event) {
|
RMAppAttemptEvent event) {
|
||||||
|
@ -1003,7 +1003,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
= (RMAppAttemptRegistrationEvent) event;
|
= (RMAppAttemptRegistrationEvent) event;
|
||||||
appAttempt.host = registrationEvent.getHost();
|
appAttempt.host = registrationEvent.getHost();
|
||||||
appAttempt.rpcPort = registrationEvent.getRpcport();
|
appAttempt.rpcPort = registrationEvent.getRpcport();
|
||||||
appAttempt.origTrackingUrl = registrationEvent.getTrackingurl();
|
appAttempt.origTrackingUrl =
|
||||||
|
sanitizeTrackingUrl(registrationEvent.getTrackingurl());
|
||||||
appAttempt.proxiedTrackingUrl =
|
appAttempt.proxiedTrackingUrl =
|
||||||
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
||||||
|
|
||||||
|
@ -1138,7 +1139,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
RMAppAttemptUnregistrationEvent unregisterEvent
|
RMAppAttemptUnregistrationEvent unregisterEvent
|
||||||
= (RMAppAttemptUnregistrationEvent) event;
|
= (RMAppAttemptUnregistrationEvent) event;
|
||||||
appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
|
appAttempt.diagnostics.append(unregisterEvent.getDiagnostics());
|
||||||
appAttempt.origTrackingUrl = unregisterEvent.getTrackingUrl();
|
appAttempt.origTrackingUrl =
|
||||||
|
sanitizeTrackingUrl(unregisterEvent.getTrackingUrl());
|
||||||
appAttempt.proxiedTrackingUrl =
|
appAttempt.proxiedTrackingUrl =
|
||||||
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
|
||||||
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
|
||||||
|
@ -1292,4 +1294,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.rmContext.getAMRMTokenSecretManager()
|
appAttempt.rmContext.getAMRMTokenSecretManager()
|
||||||
.applicationMasterFinished(appAttempt.getAppAttemptId());
|
.applicationMasterFinished(appAttempt.getAppAttemptId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String sanitizeTrackingUrl(String url) {
|
||||||
|
return (url == null || url.trim().isEmpty()) ? "N/A" : url;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -96,9 +97,9 @@ public class NodeManager implements ContainerManagementProtocol {
|
||||||
RegisterNodeManagerRequest request = recordFactory
|
RegisterNodeManagerRequest request = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerRequest.class);
|
.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||||
request.setHttpPort(httpPort);
|
request.setHttpPort(httpPort);
|
||||||
request.setNodeId(this.nodeId);
|
|
||||||
request.setResource(capability);
|
request.setResource(capability);
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
|
request.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
resourceTrackerService.registerNodeManager(request);
|
resourceTrackerService.registerNodeManager(request);
|
||||||
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
|
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
|
||||||
this.nodeId), false);
|
this.nodeId), false);
|
||||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -248,6 +249,59 @@ public class TestResourceTrackerService {
|
||||||
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationSuccess() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req = Records.newRecord(
|
||||||
|
RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
req.setResource(capability);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
req.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
// trying to register a invalid node.
|
||||||
|
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
||||||
|
Assert.assertEquals(NodeAction.NORMAL,response.getNodeAction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationVersionLessThanRM() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,"EqualToRM" );
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
String nmVersion = "1.9.9";
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req = Records.newRecord(
|
||||||
|
RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
req.setResource(capability);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
req.setNMVersion(nmVersion);
|
||||||
|
// trying to register a invalid node.
|
||||||
|
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
||||||
|
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
||||||
|
Assert.assertTrue("Diagnostic message did not contain: 'Disallowed NodeManager " +
|
||||||
|
"Version "+ nmVersion + ", is less than the minimum version'",
|
||||||
|
response.getDiagnosticsMessage().contains("Disallowed NodeManager Version " +
|
||||||
|
nmVersion + ", is less than the minimum version "));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNodeRegistrationFailure() throws Exception {
|
public void testNodeRegistrationFailure() throws Exception {
|
||||||
writeToHostsFile("host1");
|
writeToHostsFile("host1");
|
||||||
|
|
|
@ -1,77 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
public class TestRMAppAttemptImpl {
|
|
||||||
|
|
||||||
private void testTrackingUrl(String url, boolean unmanaged) {
|
|
||||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance
|
|
||||||
(ApplicationId.newInstance(1, 2), 1);
|
|
||||||
EventHandler handler = Mockito.mock(EventHandler.class);
|
|
||||||
Dispatcher dispatcher = Mockito.mock(Dispatcher.class);
|
|
||||||
Mockito.when(dispatcher.getEventHandler()).thenReturn(handler);
|
|
||||||
RMContext rmContext = Mockito.mock(RMContext.class);
|
|
||||||
Mockito.when(rmContext.getDispatcher()).thenReturn(dispatcher);
|
|
||||||
|
|
||||||
ApplicationSubmissionContext appContext =
|
|
||||||
Mockito.mock(ApplicationSubmissionContext.class);
|
|
||||||
Mockito.when(appContext.getUnmanagedAM()).thenReturn(unmanaged);
|
|
||||||
|
|
||||||
RMAppAttemptImpl attempt = new RMAppAttemptImpl(attemptId, rmContext, null,
|
|
||||||
null, appContext, new YarnConfiguration(), null);
|
|
||||||
RMAppAttemptRegistrationEvent event =
|
|
||||||
Mockito.mock(RMAppAttemptRegistrationEvent.class);
|
|
||||||
Mockito.when(event.getHost()).thenReturn("h");
|
|
||||||
Mockito.when(event.getRpcport()).thenReturn(0);
|
|
||||||
Mockito.when(event.getTrackingurl()).thenReturn(url);
|
|
||||||
new RMAppAttemptImpl.AMRegisteredTransition().transition(attempt, event);
|
|
||||||
if (unmanaged) {
|
|
||||||
Assert.assertEquals(url, attempt.getTrackingUrl());
|
|
||||||
} else {
|
|
||||||
Assert.assertNotSame(url, attempt.getTrackingUrl());
|
|
||||||
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
|
||||||
ProxyUriUtils.PROXY_SERVLET_NAME));
|
|
||||||
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
|
||||||
attemptId.getApplicationId().toString()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrackingUrlUnmanagedAM() {
|
|
||||||
testTrackingUrl("http://foo:8000/x", true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTrackingUrlManagedAM() {
|
|
||||||
testTrackingUrl("bar:8000/x", false);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -30,14 +30,18 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
|
@ -85,8 +89,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||||
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -261,8 +267,22 @@ public class TestRMAppAttemptTransitions {
|
||||||
|
|
||||||
|
|
||||||
private String getProxyUrl(RMAppAttempt appAttempt) {
|
private String getProxyUrl(RMAppAttempt appAttempt) {
|
||||||
return pjoin(RM_WEBAPP_ADDR, "proxy",
|
String url = null;
|
||||||
appAttempt.getAppAttemptId().getApplicationId(), "");
|
try {
|
||||||
|
URI trackingUri =
|
||||||
|
StringUtils.isEmpty(appAttempt.getOriginalTrackingUrl()) ? null :
|
||||||
|
ProxyUriUtils
|
||||||
|
.getUriFromAMUrl(appAttempt.getOriginalTrackingUrl());
|
||||||
|
String proxy = WebAppUtils.getProxyHostAndPort(conf);
|
||||||
|
URI proxyUri = ProxyUriUtils.getUriFromAMUrl(proxy);
|
||||||
|
URI result = ProxyUriUtils.getProxyUri(trackingUri, proxyUri,
|
||||||
|
appAttempt.getAppAttemptId().getApplicationId());
|
||||||
|
url = result.toASCIIString().substring(
|
||||||
|
HttpConfig.getSchemePrefix().length());
|
||||||
|
} catch (URISyntaxException ex) {
|
||||||
|
Assert.fail();
|
||||||
|
}
|
||||||
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -448,9 +468,9 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(host, applicationAttempt.getHost());
|
assertEquals(host, applicationAttempt.getHost());
|
||||||
assertEquals(rpcPort, applicationAttempt.getRpcPort());
|
assertEquals(rpcPort, applicationAttempt.getRpcPort());
|
||||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
if (unmanagedAM) {
|
if (unmanagedAM) {
|
||||||
assertEquals("oldtrackingurl", applicationAttempt.getTrackingUrl());
|
verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl());
|
||||||
} else {
|
} else {
|
||||||
assertEquals(getProxyUrl(applicationAttempt),
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
applicationAttempt.getTrackingUrl());
|
applicationAttempt.getTrackingUrl());
|
||||||
|
@ -468,7 +488,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(RMAppAttemptState.FINISHING,
|
assertEquals(RMAppAttemptState.FINISHING,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
assertEquals(getProxyUrl(applicationAttempt),
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
applicationAttempt.getTrackingUrl());
|
applicationAttempt.getTrackingUrl());
|
||||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||||
|
@ -487,9 +507,9 @@ public class TestRMAppAttemptTransitions {
|
||||||
assertEquals(RMAppAttemptState.FINISHED,
|
assertEquals(RMAppAttemptState.FINISHED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
verifyUrl(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||||
if (unmanagedAM) {
|
if (unmanagedAM) {
|
||||||
assertEquals("mytrackingurl", applicationAttempt.getTrackingUrl());
|
verifyUrl(trackingUrl, applicationAttempt.getTrackingUrl());
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
assertEquals(getProxyUrl(applicationAttempt),
|
assertEquals(getProxyUrl(applicationAttempt),
|
||||||
|
@ -603,9 +623,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
trackingUrl, diagnostics);
|
trackingUrl, diagnostics);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testUnmanagedAMSuccess(String url) {
|
||||||
@Test
|
|
||||||
public void testUnmanagedAMSuccess() {
|
|
||||||
unmanagedAM = true;
|
unmanagedAM = true;
|
||||||
when(submissionContext.getUnmanagedAM()).thenReturn(true);
|
when(submissionContext.getUnmanagedAM()).thenReturn(true);
|
||||||
// submit AM and check it goes to LAUNCHED state
|
// submit AM and check it goes to LAUNCHED state
|
||||||
|
@ -615,7 +633,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.getAppAttemptId());
|
applicationAttempt.getAppAttemptId());
|
||||||
|
|
||||||
// launch AM
|
// launch AM
|
||||||
runApplicationAttempt(null, "host", 8042, "oldtrackingurl", true);
|
runApplicationAttempt(null, "host", 8042, url, true);
|
||||||
|
|
||||||
// complete a container
|
// complete a container
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
||||||
|
@ -623,13 +641,12 @@ public class TestRMAppAttemptTransitions {
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||||
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
|
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
|
||||||
// complete AM
|
// complete AM
|
||||||
String trackingUrl = "mytrackingurl";
|
|
||||||
String diagnostics = "Successful";
|
String diagnostics = "Successful";
|
||||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||||
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
|
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
|
||||||
applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
|
applicationAttempt.getAppAttemptId(), url, finalStatus,
|
||||||
diagnostics));
|
diagnostics));
|
||||||
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1,
|
testAppAttemptFinishedState(null, finalStatus, url, diagnostics, 1,
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -824,12 +841,42 @@ public class TestRMAppAttemptTransitions {
|
||||||
"Killed by user");
|
"Killed by user");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTrackingUrlUnmanagedAM() {
|
||||||
|
testUnmanagedAMSuccess("oldTrackingUrl");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoTrackingUrl() {
|
public void testEmptyTrackingUrlUnmanagedAM() {
|
||||||
|
testUnmanagedAMSuccess("");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNullTrackingUrlUnmanagedAM() {
|
||||||
|
testUnmanagedAMSuccess(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testManagedAMWithTrackingUrl() {
|
||||||
|
testTrackingUrlManagedAM("theTrackingUrl");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testManagedAMWithEmptyTrackingUrl() {
|
||||||
|
testTrackingUrlManagedAM("");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testManagedAMWithNullTrackingUrl() {
|
||||||
|
testTrackingUrlManagedAM(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testTrackingUrlManagedAM(String url) {
|
||||||
Container amContainer = allocateApplicationAttempt();
|
Container amContainer = allocateApplicationAttempt();
|
||||||
launchApplicationAttempt(amContainer);
|
launchApplicationAttempt(amContainer);
|
||||||
runApplicationAttempt(amContainer, "host", 8042, "", false);
|
runApplicationAttempt(amContainer, "host", 8042, url, false);
|
||||||
|
unregisterApplicationAttempt(amContainer,
|
||||||
|
FinalApplicationStatus.SUCCEEDED, url, "Successful");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -927,4 +974,12 @@ public class TestRMAppAttemptTransitions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyUrl(String url1, String url2) {
|
||||||
|
if (url1 == null || url1.trim().isEmpty()) {
|
||||||
|
assertEquals("N/A", url2);
|
||||||
|
} else {
|
||||||
|
assertEquals(url1, url2);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue