diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 26489c063ea..1566b9c49b7 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -90,7 +90,6 @@ Release 0.23.2 - UNRELEASED (sharad, todd via todd) BUG FIXES - HADOOP-8054 NPE with FilterFileSystem (Daryn Sharp via bobby) HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading it back into HDFS, the put fails due to a CRC mismatch @@ -117,7 +116,21 @@ Release 0.23.2 - UNRELEASED HADOOP-8083 javadoc generation for some modules is not done under target/ (tucu) -Release 0.23.1 - 2012-02-08 + HADOOP-8036. TestViewFsTrash assumes the user's home directory is + 2 levels deep. (Colin Patrick McCabe via eli) + + HADOOP-8046 Revert StaticMapping semantics to the existing ones, add DNS + mapping diagnostics in progress (stevel) + + HADOOP-8057 hadoop-setup-conf.sh not working because of some extra spaces. + (Vinayakumar B via stevel) + + HADOOP-7680 TestHardLink fails on Mac OS X, when gnu stat is in path. + (Milind Bhandarkar via stevel) + + HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf) + +Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES @@ -340,6 +353,8 @@ Release 0.23.1 - 2012-02-08 HADOOP-8013. ViewFileSystem does not honor setVerifyChecksum (Daryn Sharp via bobby) + HADOOP-8054 NPE with FilterFileSystem (Daryn Sharp via bobby) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java index e078aaa8c39..eba1e0c6c8b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HardLink.java @@ -64,7 +64,7 @@ public class HardLink { //override getLinkCountCommand for the particular Unix variant //Linux is already set as the default - {"stat","-c%h", null} if (osType == OSType.OS_TYPE_MAC) { - String[] linkCountCmdTemplate = {"stat","-f%l", null}; + String[] linkCountCmdTemplate = {"/usr/bin/stat","-f%l", null}; HardLinkCGUnix.setLinkCountCmdTemplate(linkCountCmdTemplate); } else if (osType == OSType.OS_TYPE_SOLARIS) { String[] linkCountCmdTemplate = {"ls","-l", null}; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java index 80e4c95c87c..069330001fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSourceAdapter.java @@ -94,17 +94,19 @@ class MetricsSourceAdapter implements DynamicMBean { } @Override - public synchronized Object getAttribute(String attribute) + public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException { updateJmxCache(); - Attribute a = attrCache.get(attribute); - if (a == null) { - throw new AttributeNotFoundException(attribute +" not found"); + synchronized(this) { + Attribute a = attrCache.get(attribute); + if (a == null) { + throw new AttributeNotFoundException(attribute +" not found"); + } + if (LOG.isDebugEnabled()) { + LOG.debug(attribute +": "+ a); + } + return a.getValue(); } - if (LOG.isDebugEnabled()) { - LOG.debug(attribute +": "+ a); - } - return a.getValue(); } @Override @@ -115,17 +117,19 @@ class MetricsSourceAdapter implements DynamicMBean { } @Override - public synchronized AttributeList getAttributes(String[] attributes) { + public AttributeList getAttributes(String[] attributes) { updateJmxCache(); - AttributeList ret = new AttributeList(); - for (String key : attributes) { - Attribute attr = attrCache.get(key); - if (LOG.isDebugEnabled()) { - LOG.debug(key +": "+ attr); + synchronized(this) { + AttributeList ret = new AttributeList(); + for (String key : attributes) { + Attribute attr = attrCache.get(key); + if (LOG.isDebugEnabled()) { + LOG.debug(key +": "+ attr); + } + ret.add(attr); } - ret.add(attr); + return ret; } - return ret; } @Override @@ -140,17 +144,32 @@ class MetricsSourceAdapter implements DynamicMBean { } @Override - public synchronized MBeanInfo getMBeanInfo() { + public MBeanInfo getMBeanInfo() { updateJmxCache(); return infoCache; } - private synchronized void updateJmxCache() { - if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) { - if (lastRecs == null) { - MetricsCollectorImpl builder = new MetricsCollectorImpl(); - getMetrics(builder, true); + private void updateJmxCache() { + boolean getAllMetrics = false; + synchronized(this) { + if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) { + // temporarilly advance the expiry while updating the cache + jmxCacheTS = System.currentTimeMillis() + jmxCacheTTL; + if (lastRecs == null) { + getAllMetrics = true; + } } + else { + return; + } + } + + if (getAllMetrics) { + MetricsCollectorImpl builder = new MetricsCollectorImpl(); + getMetrics(builder, true); + } + + synchronized(this) { int oldCacheSize = attrCache.size(); int newCacheSize = updateAttrCache(); if (oldCacheSize < newCacheSize) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/AbstractDNSToSwitchMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/AbstractDNSToSwitchMapping.java index a1b185dd085..b1f0fb230f8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/AbstractDNSToSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/AbstractDNSToSwitchMapping.java @@ -22,6 +22,12 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** * This is a base class for DNS to Switch mappings.

It is not mandatory to @@ -89,6 +95,49 @@ public abstract class AbstractDNSToSwitchMapping return false; } + /** + * Get a copy of the map (for diagnostics) + * @return a clone of the map or null for none known + */ + public Map getSwitchMap() { + return null; + } + + /** + * Generate a string listing the switch mapping implementation, + * the mapping for every known node and the number of nodes and + * unique switches known about -each entry to a separate line. + * @return a string that can be presented to the ops team or used in + * debug messages. + */ + public String dumpTopology() { + Map rack = getSwitchMap(); + StringBuilder builder = new StringBuilder(); + builder.append("Mapping: ").append(toString()).append("\n"); + if (rack != null) { + builder.append("Map:\n"); + Set switches = new HashSet(); + for (Map.Entry entry : rack.entrySet()) { + builder.append(" ") + .append(entry.getKey()) + .append(" -> ") + .append(entry.getValue()) + .append("\n"); + switches.add(entry.getValue()); + } + builder.append("Nodes: ").append(rack.size()).append("\n"); + builder.append("Switches: ").append(switches.size()).append("\n"); + } else { + builder.append("No topology information"); + } + return builder.toString(); + } + + protected boolean isSingleSwitchByScriptPolicy() { + return conf != null + && conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null; + } + /** * Query for a {@link DNSToSwitchMapping} instance being on a single * switch. @@ -100,7 +149,7 @@ public abstract class AbstractDNSToSwitchMapping * is not derived from this class. */ public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) { - return mapping instanceof AbstractDNSToSwitchMapping + return mapping != null && mapping instanceof AbstractDNSToSwitchMapping && ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java index bdfc95cb0e4..a447c6a084f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/CachedDNSToSwitchMapping.java @@ -18,6 +18,7 @@ package org.apache.hadoop.net; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -123,6 +124,22 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping { } + /** + * Get the (host x switch) map. + * @return a copy of the cached map of hosts to rack + */ + @Override + public Map getSwitchMap() { + Map switchMap = new HashMap(cache); + return switchMap; + } + + + @Override + public String toString() { + return "cached switch mapping relaying to " + rawMapping; + } + /** * Delegate the switch topology query to the raw mapping, via * {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java index 4e2dcf6d4c7..a41a42463c7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/ScriptBasedMapping.java @@ -66,9 +66,15 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ; /** * key to the argument count that the script supports + * {@value} */ static final String SCRIPT_ARG_COUNT_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY ; + /** + * Text used in the {@link #toString()} method if there is no string + * {@value} + */ + public static final String NO_SCRIPT = "no script"; /** * Create an instance with the default configuration. @@ -104,6 +110,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { return getRawMapping().getConf(); } + @Override + public String toString() { + return "script-based mapping with " + getRawMapping().toString(); + } + /** * {@inheritDoc} *

@@ -231,7 +242,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { s.execute(); allOutput.append(s.getOutput()).append(" "); } catch (Exception e) { - LOG.warn("Exception: ", e); + LOG.warn("Exception running " + s, e); return null; } loopCount++; @@ -248,5 +259,10 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping { public boolean isSingleSwitch() { return scriptName == null; } + + @Override + public String toString() { + return scriptName != null ? ("script " + scriptName) : NO_SCRIPT; + } } } diff --git a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh index 1dbe87359f1..50549280909 100644 --- a/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh +++ b/hadoop-common-project/hadoop-common/src/main/packages/hadoop-setup-conf.sh @@ -246,7 +246,7 @@ OPTS=$(getopt \ -l 'dfs-datanode-dir-perm:' \ -l 'dfs-block-local-path-access-user:' \ -l 'dfs-client-read-shortcircuit:' \ - -l 'dfs-client-read-shortcircuit-skip-checksum:' \ + -l 'dfs-client-read-shortcircuit-skip-checksum:' \ -o 'h' \ -- "$@") diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java index 233f2aaaf66..2f8d8ce8486 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java @@ -37,15 +37,15 @@ public class TestFSMainOperationsLocalFileSystem extends FSMainOperationsBaseTes public void setUp() throws Exception { Configuration conf = new Configuration(); fcTarget = FileSystem.getLocal(conf); - fSys = ViewFileSystemTestSetup.setupForViewFs( - ViewFileSystemTestSetup.configWithViewfsScheme(), fcTarget); + fSys = ViewFileSystemTestSetup.setupForViewFileSystem( + ViewFileSystemTestSetup.createConfig(), fcTarget); super.setUp(); } @After public void tearDown() throws Exception { super.tearDown(); - ViewFileSystemTestSetup.tearDownForViewFs(fcTarget); + ViewFileSystemTestSetup.tearDown(fcTarget); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java index b9c0a087b31..e320f87bbce 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java @@ -40,12 +40,12 @@ public class TestViewFileSystemDelegation { //extends ViewFileSystemTestSetup { @BeforeClass public static void setup() throws Exception { - conf = ViewFileSystemTestSetup.configWithViewfsScheme(); + conf = ViewFileSystemTestSetup.createConfig(); fs1 = setupFileSystem(new URI("fs1:/"), FakeFileSystem.class); fs2 = setupFileSystem(new URI("fs2:/"), FakeFileSystem.class); viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf); } - + static FakeFileSystem setupFileSystem(URI uri, Class clazz) throws Exception { String scheme = uri.getScheme(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java index 7795c3f5f02..81270c2320d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java @@ -35,7 +35,6 @@ import org.mortbay.log.Log; public class TestViewFsTrash { FileSystem fsTarget; // the target file system - the mount will point here FileSystem fsView; - Path targetTestRoot; Configuration conf; static class TestLFS extends LocalFileSystem { @@ -55,52 +54,19 @@ public class TestViewFsTrash { @Before public void setUp() throws Exception { fsTarget = FileSystem.getLocal(new Configuration()); - targetTestRoot = FileSystemTestHelper.getAbsoluteTestRootPath(fsTarget); - // In case previous test was killed before cleanup - fsTarget.delete(targetTestRoot, true); - // cleanup trash from previous run if it stuck around - fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"), - true); - - fsTarget.mkdirs(targetTestRoot); - fsTarget.mkdirs(new Path(targetTestRoot,"dir1")); - - - // Now we use the mount fs to set links to user and dir - // in the test root - - // Set up the defaultMT in the config with our mount point links - - - conf = ViewFileSystemTestSetup.configWithViewfsScheme(); - - // create a link for home directory so that trash path works - // set up viewfs's home dir root to point to home dir root on target - // But home dir is different on linux, mac etc. - // Figure it out by calling home dir on target - - String homeDirRoot = fsTarget.getHomeDirectory() - .getParent().toUri().getPath(); - ConfigUtil.addLink(conf, homeDirRoot, - fsTarget.makeQualified(new Path(homeDirRoot)).toUri()); - ConfigUtil.setHomeDirConf(conf, homeDirRoot); - Log.info("Home dir base " + homeDirRoot); - - fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget); - - // set working dir so that relative paths - //fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath())); + fsTarget.mkdirs(new Path(FileSystemTestHelper. + getTestRootPath(fsTarget), "dir1")); + conf = ViewFileSystemTestSetup.createConfig(); + fsView = ViewFileSystemTestSetup.setupForViewFileSystem(conf, fsTarget); conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString()); } - @After public void tearDown() throws Exception { - fsTarget.delete(targetTestRoot, true); + ViewFileSystemTestSetup.tearDown(fsTarget); fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"), true); } - @Test public void testTrash() throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java index 5276a06207c..1de434e3a96 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java @@ -89,7 +89,7 @@ public class ViewFileSystemBaseTest { // Set up the defaultMT in the config with our mount point links //Configuration conf = new Configuration(); - conf = ViewFileSystemTestSetup.configWithViewfsScheme(); + conf = ViewFileSystemTestSetup.createConfig(); setupMountPoints(); fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java index e1c28e696f7..11f4d7af713 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.mortbay.log.Log; /** @@ -46,32 +47,21 @@ public class ViewFileSystemTestSetup { * @return return the ViewFS File context to be used for tests * @throws Exception */ - static public FileSystem setupForViewFs(Configuration conf, FileSystem fsTarget) throws Exception { + static public FileSystem setupForViewFileSystem(Configuration conf, FileSystem fsTarget) throws Exception { /** * create the test root on local_fs - the mount table will point here */ - Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget); - // In case previous test was killed before cleanup - fsTarget.delete(targetOfTests, true); - - fsTarget.mkdirs(targetOfTests); - + fsTarget.mkdirs(FileSystemTestHelper.getTestRootPath(fsTarget)); + + // viewFs://home => fsTarget://home + String homeDirRoot = fsTarget.getHomeDirectory() + .getParent().toUri().getPath(); + ConfigUtil.addLink(conf, homeDirRoot, + fsTarget.makeQualified(new Path(homeDirRoot)).toUri()); + ConfigUtil.setHomeDirConf(conf, homeDirRoot); + Log.info("Home dir base " + homeDirRoot); - // Now set up a link from viewfs to targetfs for the first component of - // path of testdir. For example, if testdir is /user//xx then - // a link from /user to targetfs://user. - - String testDir = FileSystemTestHelper.getTestRootPath(fsTarget).toUri().getPath(); - int indexOf2ndSlash = testDir.indexOf('/', 1); - String testDirFirstComponent = testDir.substring(0, indexOf2ndSlash); - - - ConfigUtil.addLink(conf, testDirFirstComponent, - fsTarget.makeQualified(new Path(testDirFirstComponent)).toUri()); - FileSystem fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf); - //System.out.println("SRCOfTests = "+ getTestRootPath(fs, "test")); - //System.out.println("TargetOfTests = "+ targetOfTests.toUri()); return fsView; } @@ -79,12 +69,12 @@ public class ViewFileSystemTestSetup { * * delete the test directory in the target fs */ - static public void tearDownForViewFs(FileSystem fsTarget) throws Exception { + static public void tearDown(FileSystem fsTarget) throws Exception { Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget); fsTarget.delete(targetOfTests, true); } - public static Configuration configWithViewfsScheme() { + public static Configuration createConfig() { Configuration conf = new Configuration(); conf.set("fs.viewfs.impl", ViewFileSystem.class.getName()); return conf; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java index 42a8e79bc00..379e9401d48 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/StaticMapping.java @@ -21,8 +21,10 @@ import org.apache.hadoop.conf.Configuration; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Implements the {@link DNSToSwitchMapping} via static mappings. Used @@ -34,6 +36,10 @@ import java.util.Map; * When an instance of the class has its {@link #setConf(Configuration)} * method called, nodes listed in the configuration will be added to the map. * These do not get removed when the instance is garbage collected. + * + * The switch mapping policy of this class is the same as for the + * {@link ScriptBasedMapping} -the presence of a non-empty topology script. + * The script itself is not used. */ public class StaticMapping extends AbstractDNSToSwitchMapping { @@ -109,12 +115,30 @@ public class StaticMapping extends AbstractDNSToSwitchMapping { } /** - * Declare that this mapping is always multi-switch + * The switch policy of this mapping is driven by the same policy + * as the Scripted mapping: the presence of the script name in + * the configuration file * @return false, always */ @Override public boolean isSingleSwitch() { - return false; + return isSingleSwitchByScriptPolicy(); + } + + /** + * Get a copy of the map (for diagnostics) + * @return a clone of the map or null for none known + */ + @Override + public Map getSwitchMap() { + synchronized (nameToRackMap) { + return new HashMap(nameToRackMap); + } + } + + @Override + public String toString() { + return "static mapping with single switch = " + isSingleSwitch(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestStaticMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestStaticMapping.java index 90963a5bdb6..f3c0a5cb255 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestStaticMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestStaticMapping.java @@ -18,22 +18,27 @@ package org.apache.hadoop.net; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Test the static mapping class. * Because the map is actually static, this map needs to be reset for every test */ public class TestStaticMapping extends Assert { + private static final Log LOG = LogFactory.getLog(TestStaticMapping.class); /** * Reset the map then create a new instance of the {@link StaticMapping} - * class + * class with a null configuration * @return a new instance */ private StaticMapping newInstance() { @@ -41,63 +46,195 @@ public class TestStaticMapping extends Assert { return new StaticMapping(); } - @Test - public void testStaticIsSingleSwitch() throws Throwable { + + /** + * Reset the map then create a new instance of the {@link StaticMapping} + * class with the topology script in the configuration set to + * the parameter + * @param script a (never executed) script, can be null + * @return a new instance + */ + private StaticMapping newInstance(String script) { StaticMapping mapping = newInstance(); - assertFalse("Empty maps should not be not single switch", - mapping.isSingleSwitch()); + mapping.setConf(createConf(script)); + return mapping; } + /** + * Create a configuration with a specific topology script + * @param script a (never executed) script, can be null + * @return a configuration + */ + private Configuration createConf(String script) { + Configuration conf = new Configuration(); + if (script != null) { + conf.set(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + script); + } else { + conf.unset(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY); + } + return conf; + } + + private void assertSingleSwitch(DNSToSwitchMapping mapping) { + assertEquals("Expected a single switch mapping " + + mapping, + true, + AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); + } + + private void assertMultiSwitch(DNSToSwitchMapping mapping) { + assertEquals("Expected a multi switch mapping " + + mapping, + false, + AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); + } + + protected void assertMapSize(AbstractDNSToSwitchMapping switchMapping, int expectedSize) { + assertEquals( + "Expected two entries in the map " + switchMapping.dumpTopology(), + expectedSize, switchMapping.getSwitchMap().size()); + } + + private List createQueryList() { + List l1 = new ArrayList(2); + l1.add("n1"); + l1.add("unknown"); + return l1; + } @Test - public void testCachingRelaysQueries() throws Throwable { - StaticMapping staticMapping = newInstance(); - CachedDNSToSwitchMapping mapping = - new CachedDNSToSwitchMapping(staticMapping); - StaticMapping.addNodeToRack("n1", "r1"); - assertFalse("Expected multi switch", mapping.isSingleSwitch()); + public void testStaticIsSingleSwitchOnNullScript() throws Throwable { + StaticMapping mapping = newInstance(null); + mapping.setConf(createConf(null)); + assertSingleSwitch(mapping); + } + + @Test + public void testStaticIsMultiSwitchOnScript() throws Throwable { + StaticMapping mapping = newInstance("ls"); + assertMultiSwitch(mapping); } @Test public void testAddResolveNodes() throws Throwable { StaticMapping mapping = newInstance(); - StaticMapping.addNodeToRack("n1", "r1"); - List l1 = new ArrayList(2); - l1.add("n1"); - l1.add("unknown"); - List mappings = mapping.resolve(l1); - assertEquals(2, mappings.size()); - assertEquals("r1", mappings.get(0)); - assertEquals(NetworkTopology.DEFAULT_RACK, mappings.get(1)); - assertFalse("Mapping is still single switch", mapping.isSingleSwitch()); + StaticMapping.addNodeToRack("n1", "/r1"); + List queryList = createQueryList(); + List resolved = mapping.resolve(queryList); + assertEquals(2, resolved.size()); + assertEquals("/r1", resolved.get(0)); + assertEquals(NetworkTopology.DEFAULT_RACK, resolved.get(1)); + // get the switch map and examine it + Map switchMap = mapping.getSwitchMap(); + String topology = mapping.dumpTopology(); + LOG.info(topology); + assertEquals(topology, 1, switchMap.size()); + assertEquals(topology, "/r1", switchMap.get("n1")); } + /** + * Verify that a configuration string builds a topology + */ @Test public void testReadNodesFromConfig() throws Throwable { StaticMapping mapping = newInstance(); Configuration conf = new Configuration(); - conf.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING, "n1=r1,n2=r2"); + conf.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING, "n1=/r1,n2=/r2"); mapping.setConf(conf); + //even though we have inserted elements into the list, because + //it is driven by the script key in the configuration, it still + //thinks that it is single rack + assertSingleSwitch(mapping); List l1 = new ArrayList(3); l1.add("n1"); l1.add("unknown"); l1.add("n2"); - List mappings = mapping.resolve(l1); - assertEquals(3, mappings.size()); - assertEquals("r1", mappings.get(0)); - assertEquals(NetworkTopology.DEFAULT_RACK, mappings.get(1)); - assertEquals("r2", mappings.get(2)); - assertFalse("Expected to be multi switch", - AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); + List resolved = mapping.resolve(l1); + assertEquals(3, resolved.size()); + assertEquals("/r1", resolved.get(0)); + assertEquals(NetworkTopology.DEFAULT_RACK, resolved.get(1)); + assertEquals("/r2", resolved.get(2)); + + Map switchMap = mapping.getSwitchMap(); + String topology = mapping.dumpTopology(); + LOG.info(topology); + assertEquals(topology, 2, switchMap.size()); + assertEquals(topology, "/r1", switchMap.get("n1")); + assertNull(topology, switchMap.get("unknown")); } + + /** + * Verify that if the inner mapping is single-switch, so is the cached one + * @throws Throwable on any problem + */ @Test - public void testNullConfiguration() throws Throwable { - StaticMapping mapping = newInstance(); - mapping.setConf(null); - assertFalse("Null maps are expected to be multi switch", - mapping.isSingleSwitch()); - assertFalse("Expected to be multi switch", - AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); + public void testCachingRelaysSingleSwitchQueries() throws Throwable { + //create a single switch map + StaticMapping staticMapping = newInstance(null); + assertSingleSwitch(staticMapping); + CachedDNSToSwitchMapping cachedMap = + new CachedDNSToSwitchMapping(staticMapping); + LOG.info("Mapping: " + cachedMap + "\n" + cachedMap.dumpTopology()); + assertSingleSwitch(cachedMap); } + + /** + * Verify that if the inner mapping is multi-switch, so is the cached one + * @throws Throwable on any problem + */ + @Test + public void testCachingRelaysMultiSwitchQueries() throws Throwable { + StaticMapping staticMapping = newInstance("top"); + assertMultiSwitch(staticMapping); + CachedDNSToSwitchMapping cachedMap = + new CachedDNSToSwitchMapping(staticMapping); + LOG.info("Mapping: " + cachedMap + "\n" + cachedMap.dumpTopology()); + assertMultiSwitch(cachedMap); + } + + + /** + * This test verifies that resultion queries get relayed to the inner rack + * @throws Throwable on any problem + */ + @Test + public void testCachingRelaysResolveQueries() throws Throwable { + StaticMapping mapping = newInstance(); + mapping.setConf(createConf("top")); + StaticMapping staticMapping = mapping; + CachedDNSToSwitchMapping cachedMap = + new CachedDNSToSwitchMapping(staticMapping); + assertMapSize(cachedMap, 0); + //add a node to the static map + StaticMapping.addNodeToRack("n1", "/r1"); + //verify it is there + assertMapSize(staticMapping, 1); + //verify that the cache hasn't picked it up yet + assertMapSize(cachedMap, 0); + //now relay the query + cachedMap.resolve(createQueryList()); + //and verify the cache is no longer empty + assertMapSize(cachedMap, 2); + } + + /** + * This test verifies that resultion queries get relayed to the inner rack + * @throws Throwable on any problem + */ + @Test + public void testCachingCachesNegativeEntries() throws Throwable { + StaticMapping staticMapping = newInstance(); + CachedDNSToSwitchMapping cachedMap = + new CachedDNSToSwitchMapping(staticMapping); + assertMapSize(cachedMap, 0); + assertMapSize(staticMapping, 0); + List resolved = cachedMap.resolve(createQueryList()); + //and verify the cache is no longer empty while the static map is + assertMapSize(staticMapping, 0); + assertMapSize(cachedMap, 2); + } + + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java index e66a5c01652..c2fcf172c0c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSwitchMapping.java @@ -18,6 +18,8 @@ package org.apache.hadoop.net; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.junit.Assert; import org.junit.Test; @@ -28,22 +30,87 @@ import java.util.List; */ public class TestSwitchMapping extends Assert { + + /** + * Verify the switch mapping query handles arbitrary DNSToSwitchMapping + * implementations + * + * @throws Throwable on any problem + */ @Test public void testStandaloneClassesAssumedMultiswitch() throws Throwable { DNSToSwitchMapping mapping = new StandaloneSwitchMapping(); - assertFalse("Expected to be multi switch", + assertFalse("Expected to be multi switch " + mapping, AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping)); } + /** + * Verify the cached mapper delegates the switch mapping query to the inner + * mapping, which again handles arbitrary DNSToSwitchMapping implementations + * + * @throws Throwable on any problem + */ @Test public void testCachingRelays() throws Throwable { CachedDNSToSwitchMapping mapping = new CachedDNSToSwitchMapping(new StandaloneSwitchMapping()); - assertFalse("Expected to be multi switch", + assertFalse("Expected to be multi switch " + mapping, mapping.isSingleSwitch()); } + + /** + * Verify the cached mapper delegates the switch mapping query to the inner + * mapping, which again handles arbitrary DNSToSwitchMapping implementations + * + * @throws Throwable on any problem + */ + @Test + public void testCachingRelaysStringOperations() throws Throwable { + Configuration conf = new Configuration(); + String scriptname = "mappingscript.sh"; + conf.set(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, + scriptname); + ScriptBasedMapping scriptMapping = new ScriptBasedMapping(conf); + assertTrue("Did not find " + scriptname + " in " + scriptMapping, + scriptMapping.toString().contains(scriptname)); + CachedDNSToSwitchMapping mapping = + new CachedDNSToSwitchMapping(scriptMapping); + assertTrue("Did not find " + scriptname + " in " + mapping, + mapping.toString().contains(scriptname)); + } + + /** + * Verify the cached mapper delegates the switch mapping query to the inner + * mapping, which again handles arbitrary DNSToSwitchMapping implementations + * + * @throws Throwable on any problem + */ + @Test + public void testCachingRelaysStringOperationsToNullScript() throws Throwable { + Configuration conf = new Configuration(); + ScriptBasedMapping scriptMapping = new ScriptBasedMapping(conf); + assertTrue("Did not find " + ScriptBasedMapping.NO_SCRIPT + + " in " + scriptMapping, + scriptMapping.toString().contains(ScriptBasedMapping.NO_SCRIPT)); + CachedDNSToSwitchMapping mapping = + new CachedDNSToSwitchMapping(scriptMapping); + assertTrue("Did not find " + ScriptBasedMapping.NO_SCRIPT + + " in " + mapping, + mapping.toString().contains(ScriptBasedMapping.NO_SCRIPT)); + } + + @Test + public void testNullMapping() { + assertFalse(AbstractDNSToSwitchMapping.isMappingSingleSwitch(null)); + } + + /** + * This class does not extend the abstract switch mapping, and verifies that + * the switch mapping logic assumes that this is multi switch + */ + private static class StandaloneSwitchMapping implements DNSToSwitchMapping { @Override public List resolve(List names) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2ac39f07717..707b550de9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -144,6 +144,9 @@ Release 0.23.2 - UNRELEASED HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. (Brandon Li via jitendra) + HDFS-2725. hdfs script usage information is missing the information + about "dfs" command (Prashant Sharma via stevel) + OPTIMIZATIONS BUG FIXES @@ -169,7 +172,9 @@ Release 0.23.2 - UNRELEASED HDFS-2938. Recursive delete of a large directory make namenode unresponsive. (Hari Mankude via suresh) -Release 0.23.1 - 2012-02-08 + HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd) + +Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index 314fac8fd82..4c56bb3f149 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -26,6 +26,7 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR} function print_usage(){ echo "Usage: hdfs [--config confdir] COMMAND" echo " where COMMAND is one of:" + echo " dfs run a filesystem command on the file systems supported in Hadoop." echo " namenode -format format the DFS filesystem" echo " secondarynamenode run the DFS secondary namenode" echo " namenode run the DFS namenode" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java index 035e5c4292b..05a2b6ec859 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java @@ -145,7 +145,7 @@ public class ExtendedBlock implements Writable { return false; } ExtendedBlock b = (ExtendedBlock)o; - return b.block.equals(block) || b.poolId.equals(poolId); + return b.block.equals(block) && b.poolId.equals(poolId); } @Override // Object diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java index 87c4966093e..12a9ff378bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java @@ -73,7 +73,7 @@ public class TestViewFsFileStatusHdfs { long len = FileSystemTestHelper.createFile(fHdfs, testfilename); - Configuration conf = ViewFileSystemTestSetup.configWithViewfsScheme(); + Configuration conf = ViewFileSystemTestSetup.createConfig(); ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + "/tmp")); FileSystem vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf); assertEquals(ViewFileSystem.class, vfs.getClass()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java new file mode 100644 index 00000000000..602f016dac4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import static org.junit.Assert.*; + +import org.junit.Test; + + +public class TestExtendedBlock { + static final String POOL_A = "blockpool-a"; + static final String POOL_B = "blockpool-b"; + static final Block BLOCK_1_GS1 = new Block(1L, 100L, 1L); + static final Block BLOCK_1_GS2 = new Block(1L, 100L, 2L); + static final Block BLOCK_2_GS1 = new Block(2L, 100L, 1L); + + @Test + public void testEquals() { + // Same block -> equal + assertEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1), + new ExtendedBlock(POOL_A, BLOCK_1_GS1)); + // Different pools, same block id -> not equal + assertNotEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1), + new ExtendedBlock(POOL_B, BLOCK_1_GS1)); + // Same pool, different block id -> not equal + assertNotEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1), + new ExtendedBlock(POOL_A, BLOCK_2_GS1)); + // Same block, different genstamps -> equal + assertEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1), + new ExtendedBlock(POOL_A, BLOCK_1_GS2)); + } + + private static void assertNotEquals(Object a, Object b) { + assertFalse("expected not equal: '" + a + "' and '" + b + "'", + a.equals(b)); + } +} diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1d0c0ec9b61..c0770327d42 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -39,10 +39,16 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3854. Fixed and reenabled tests related to MR child JVM's environmental variables in TestMiniMRChildTask. (Tom White via vinodkv) + MAPREDUCE-3877 Add a test to formalise the current state transitions + of the yarn lifecycle. (stevel) + OPTIMIZATIONS BUG FIXES + MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering + DeletionService threads (Jason Lowe via bobby) + MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid JSON. (B Anil Kumar via tgraves) @@ -52,11 +58,16 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3864. Fix cluster setup docs for correct SecondaryNameNode HTTPS parameters. (todd) - MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking - urls when mutiple jobs are submitted from same client jvm. (Eric Payne via - sseth) + MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to + avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo) + + MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when + their EventHandlers get exceptions. (vinodkv) + + MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it + to the maven build. (Ravi Prakash via vinodkv) -Release 0.23.1 - 2012-02-08 +Release 0.23.1 - 2012-02-17 NEW FEATURES @@ -807,6 +818,12 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3858. Task attempt failure during commit results in task never completing. (Tom White via mahadev) + MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking + urls when mutiple jobs are submitted from same client jvm. (Eric Payne via + sseth) + + MAPREDUCE-3880. Changed LCE binary to be 32-bit. (acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 6c45574b7dc..0dddd66d59f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -188,6 +188,8 @@ public class MRAppMaster extends CompositeService { @Override public void init(final Configuration conf) { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + downloadTokensAndSetupUGI(conf); context = new RunningAppContext(conf); @@ -379,6 +381,7 @@ public class MRAppMaster extends CompositeService { // this is the only job, so shut down the Appmaster // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) + // Send job-end notification if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { try { LOG.info("Job end notification started for jobID : " @@ -405,7 +408,6 @@ public class MRAppMaster extends CompositeService { LOG.info("Calling stop for all the services"); stop(); - // Send job-end notification } catch (Throwable t) { LOG.warn("Graceful stop failed ", t); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java index 00bfc391ae4..ce720e1dea5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java @@ -61,7 +61,8 @@ public class ContainerLauncherEvent @Override public String toString() { - return super.toString() + " for taskAttempt " + taskAttemptID; + return super.toString() + " for container " + containerID + " taskAttempt " + + taskAttemptID; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 1443eed6089..0befad86427 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -334,7 +334,6 @@ public class ContainerLauncherImpl extends AbstractService implements LOG.error("Returning, interrupted : " + e); return; } - int poolSize = launcherPool.getCorePoolSize(); // See if we need up the pool size only if haven't reached the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 3bf6e075849..8e041e890ce 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -216,26 +216,18 @@ public class RecoveryService extends CompositeService implements Recovery { protected Dispatcher createRecoveryDispatcher() { return new RecoveryDispatcher(); } - - protected Dispatcher createRecoveryDispatcher(boolean exitOnException) { - return new RecoveryDispatcher(exitOnException); - } @SuppressWarnings("rawtypes") class RecoveryDispatcher extends AsyncDispatcher { private final EventHandler actualHandler; private final EventHandler handler; - RecoveryDispatcher(boolean exitOnException) { - super(exitOnException); + RecoveryDispatcher() { + super(); actualHandler = super.getEventHandler(); handler = new InterceptingEventHandler(actualHandler); } - RecoveryDispatcher() { - this(false); - } - @Override @SuppressWarnings("unchecked") public void dispatch(Event event) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 00bdaebfe80..325adacb300 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 1b35b21559a..60ec171c5f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerToken; @@ -49,6 +51,7 @@ import org.junit.Test; * Tests the state machine with respect to Job/Task/TaskAttempt failure * scenarios. */ +@SuppressWarnings("unchecked") public class TestFail { @Test @@ -247,10 +250,17 @@ public class TestFail { //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure return new TaskAttemptListenerImpl(getContext(), null) { + @Override public void startRpcServer(){}; + @Override public void stopRpcServer(){}; + @Override + public InetSocketAddress getAddress() { + return NetUtils.createSocketAddr("localhost", 1234); + } public void init(Configuration conf) { - conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout + conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000); super.init(conf); } }; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 509c49e9cc0..87fce7ece6b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; -import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; -import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -724,13 +719,6 @@ public class TestRecovery { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); } - @Override - protected Recovery createRecoveryService(AppContext appContext) { - return new RecoveryServiceWithCustomDispatcher( - appContext.getApplicationAttemptId(), appContext.getClock(), - getCommitter()); - } - @Override protected ContainerLauncher createContainerLauncher(AppContext context) { MockContainerLauncher launcher = new MockContainerLauncher() { @@ -757,21 +745,6 @@ public class TestRecovery { } } - static class RecoveryServiceWithCustomDispatcher extends RecoveryService { - - public RecoveryServiceWithCustomDispatcher( - ApplicationAttemptId applicationAttemptId, Clock clock, - OutputCommitter committer) { - super(applicationAttemptId, clock, committer); - } - - @Override - public Dispatcher createRecoveryDispatcher() { - return super.createRecoveryDispatcher(false); - } - - } - public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index f25de5c8cc2..b0ecb5c9bf7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; @@ -64,8 +65,6 @@ public class TestContainerLauncher { appId, 3); JobId jobId = MRBuilderUtils.newJobId(appId, 8); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP); - TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); - ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10); AppContext context = mock(AppContext.class); CustomContainerLauncher containerLauncher = new CustomContainerLauncher( @@ -83,6 +82,8 @@ public class TestContainerLauncher { containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; for (int i = 0; i < 10; i++) { + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i); containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); @@ -92,9 +93,21 @@ public class TestContainerLauncher { Assert.assertNull(containerLauncher.foundErrors); // Same set of hosts, so no change - containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE; containerLauncher.finishEventHandling = true; + int timeOut = 0; + while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) { + LOG.info("Waiting for number of events processed to become " + 10 + + ". It is now " + containerLauncher.numEventsProcessed.get() + + ". Timeout is " + timeOut); + Thread.sleep(1000); + } + Assert.assertEquals(10, containerLauncher.numEventsProcessed.get()); + containerLauncher.finishEventHandling = false; for (int i = 0; i < 10; i++) { + ContainerId containerId = + BuilderUtils.newContainerId(appAttemptId, i + 10); + TaskAttemptId taskAttemptId = + MRBuilderUtils.newTaskAttemptId(taskId, i + 10); containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); @@ -106,14 +119,16 @@ public class TestContainerLauncher { // Different hosts, there should be an increase in core-thread-pool size to // 21(11hosts+10buffer) // Core pool size should be 21 but the live pool size should be only 11. - containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE; - for (int i = 1; i <= 2; i++) { - containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, - containerId, "host1" + i + ":1234", null, - ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); - } - waitForEvents(containerLauncher, 22); - Assert.assertEquals(12, threadPool.getPoolSize()); + containerLauncher.expectedCorePoolSize = + 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE; + containerLauncher.finishEventHandling = false; + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21); + TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21); + containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, + containerId, "host11:1234", null, + ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)); + waitForEvents(containerLauncher, 21); + Assert.assertEquals(11, threadPool.getPoolSize()); Assert.assertNull(containerLauncher.foundErrors); containerLauncher.stop(); @@ -172,15 +187,15 @@ public class TestContainerLauncher { private void waitForEvents(CustomContainerLauncher containerLauncher, int expectedNumEvents) throws InterruptedException { - int timeOut = 20; - while (expectedNumEvents != containerLauncher.numEventsProcessed - || timeOut++ < 20) { + int timeOut = 0; + while (containerLauncher.numEventsProcessing.get() < expectedNumEvents + && timeOut++ < 20) { LOG.info("Waiting for number of events to become " + expectedNumEvents - + ". It is now " + containerLauncher.numEventsProcessed); + + ". It is now " + containerLauncher.numEventsProcessing.get()); Thread.sleep(1000); } - Assert - .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed); + Assert.assertEquals(expectedNumEvents, + containerLauncher.numEventsProcessing.get()); } @Test @@ -244,9 +259,11 @@ public class TestContainerLauncher { private final class CustomContainerLauncher extends ContainerLauncherImpl { private volatile int expectedCorePoolSize = 0; - private volatile int numEventsProcessed = 0; + private AtomicInteger numEventsProcessing = new AtomicInteger(0); + private AtomicInteger numEventsProcessed = new AtomicInteger(0); private volatile String foundErrors = null; private volatile boolean finishEventHandling; + private CustomContainerLauncher(AppContext context) { super(context); } @@ -255,8 +272,38 @@ public class TestContainerLauncher { return super.launcherPool; } + private final class CustomEventProcessor extends + ContainerLauncherImpl.EventProcessor { + private final ContainerLauncherEvent event; + + private CustomEventProcessor(ContainerLauncherEvent event) { + super(event); + this.event = event; + } + + @Override + public void run() { + // do nothing substantial + + LOG.info("Processing the event " + event.toString()); + + numEventsProcessing.incrementAndGet(); + // Stall + while (!finishEventHandling) { + synchronized (this) { + try { + wait(1000); + } catch (InterruptedException e) { + ; + } + } + } + numEventsProcessed.incrementAndGet(); + } + } + protected ContainerLauncherImpl.EventProcessor createEventProcessor( - ContainerLauncherEvent event) { + final ContainerLauncherEvent event) { // At this point of time, the EventProcessor is being created and so no // additional threads would have been created. @@ -266,23 +313,7 @@ public class TestContainerLauncher { + launcherPool.getCorePoolSize(); } - return new ContainerLauncherImpl.EventProcessor(event) { - @Override - public void run() { - // do nothing substantial - numEventsProcessed++; - // Stall - synchronized(this) { - try { - while(!finishEventHandling) { - wait(1000); - } - } catch (InterruptedException e) { - ; - } - } - } - }; + return new CustomEventProcessor(event); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java index 58516c42197..1d02ca55594 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree extends ProcessTree { // to a test directory. private String procfsDir; - private Integer pid = -1; + static private String deadPid = "-1"; + private String pid = deadPid; + static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*"); private Long cpuTime = 0L; private boolean setsidUsed = false; private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL; - private Map processTree = new HashMap(); + private Map processTree = new HashMap(); public ProcfsBasedProcessTree(String pid) { this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL); @@ -166,19 +169,19 @@ public class ProcfsBasedProcessTree extends ProcessTree { * @return the process-tree with latest state. */ public ProcfsBasedProcessTree getProcessTree() { - if (pid != -1) { + if (!pid.equals(deadPid)) { // Get the list of processes - List processList = getProcessList(); + List processList = getProcessList(); - Map allProcessInfo = new HashMap(); + Map allProcessInfo = new HashMap(); // cache the processTree to get the age for processes - Map oldProcs = - new HashMap(processTree); + Map oldProcs = + new HashMap(processTree); processTree.clear(); ProcessInfo me = null; - for (Integer proc : processList) { + for (String proc : processList) { // Get information for each process ProcessInfo pInfo = new ProcessInfo(proc); if (constructProcessInfo(pInfo, procfsDir) != null) { @@ -195,9 +198,9 @@ public class ProcfsBasedProcessTree extends ProcessTree { } // Add each process to its parent. - for (Map.Entry entry : allProcessInfo.entrySet()) { - Integer pID = entry.getKey(); - if (pID != 1) { + for (Map.Entry entry : allProcessInfo.entrySet()) { + String pID = entry.getKey(); + if (!pID.equals("1")) { ProcessInfo pInfo = entry.getValue(); ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid()); if (parentPInfo != null) { @@ -218,7 +221,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { } // update age values and compute the number of jiffies since last update - for (Map.Entry procs : processTree.entrySet()) { + for (Map.Entry procs : processTree.entrySet()) { ProcessInfo oldInfo = oldProcs.get(procs.getKey()); if (procs.getValue() != null) { procs.getValue().updateJiffy(oldInfo); @@ -242,10 +245,10 @@ public class ProcfsBasedProcessTree extends ProcessTree { * @return true if the root-process is alive, false otherwise. */ public boolean isAlive() { - if (pid == -1) { + if (pid.equals(deadPid)) { return false; } else { - return isAlive(pid.toString()); + return isAlive(pid); } } @@ -256,8 +259,8 @@ public class ProcfsBasedProcessTree extends ProcessTree { * alive, false otherwise. */ public boolean isAnyProcessInTreeAlive() { - for (Integer pId : processTree.keySet()) { - if (isAlive(pId.toString())) { + for (String pId : processTree.keySet()) { + if (isAlive(pId)) { return true; } } @@ -269,9 +272,8 @@ public class ProcfsBasedProcessTree extends ProcessTree { * @param procfsDir Procfs root dir */ static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) { - Integer pId = Integer.parseInt(pidStr); // Get information for this process - ProcessInfo pInfo = new ProcessInfo(pId); + ProcessInfo pInfo = new ProcessInfo(pidStr); pInfo = constructProcessInfo(pInfo, procfsDir); if (pInfo == null) { // process group leader may have finished execution, but we still need to @@ -279,14 +281,15 @@ public class ProcfsBasedProcessTree extends ProcessTree { return true; } + String pgrpId = pInfo.getPgrpId().toString(); //make sure that pId and its pgrpId match - if (!pInfo.getPgrpId().equals(pId)) { - LOG.warn("Unexpected: Process with PID " + pId + - " is not a process group leader."); + if (!pgrpId.equals(pidStr)) { + LOG.warn("Unexpected: Process with PID " + pidStr + + " is not a process group leader. pgrpId is: " + pInfo.getPgrpId()); return false; } if (LOG.isDebugEnabled()) { - LOG.debug(pId + " is a process group leader, as expected."); + LOG.debug(pidStr + " is a process group leader, as expected."); } return true; } @@ -324,7 +327,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { */ public void destroy(boolean inBackground) { LOG.debug("Killing ProcfsBasedProcessTree of " + pid); - if (pid == -1) { + if (pid.equals(deadPid)) { return; } if (isAlive(pid.toString())) { @@ -347,7 +350,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { } private static final String PROCESSTREE_DUMP_FORMAT = - "\t|- %d %d %d %d %s %d %d %d %d %s\n"; + "\t|- %s %s %d %d %s %d %d %d %d %s\n"; /** * Get a dump of the process-tree. @@ -458,34 +461,27 @@ public class ProcfsBasedProcessTree extends ProcessTree { return cpuTime; } - private static Integer getValidPID(String pid) { - Integer retPid = -1; - try { - retPid = Integer.parseInt(pid); - if (retPid <= 0) { - retPid = -1; - } - } catch (NumberFormatException nfe) { - retPid = -1; - } - return retPid; + private static String getValidPID(String pid) { + if (pid == null) return deadPid; + Matcher m = numberPattern.matcher(pid); + if (m.matches()) return pid; + return deadPid; } /** * Get the list of all processes in the system. */ - private List getProcessList() { + private List getProcessList() { String[] processDirs = (new File(procfsDir)).list(); - List processList = new ArrayList(); + List processList = new ArrayList(); for (String dir : processDirs) { + Matcher m = numberPattern.matcher(dir); + if (!m.matches()) continue; try { - int pd = Integer.parseInt(dir); if ((new File(procfsDir, dir)).isDirectory()) { - processList.add(Integer.valueOf(pd)); + processList.add(dir); } - } catch (NumberFormatException n) { - // skip this directory } catch (SecurityException s) { // skip this process } @@ -511,7 +507,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { BufferedReader in = null; FileReader fReader = null; try { - File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid())); + File pidDir = new File(procfsDir, pinfo.getPid()); fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE)); in = new BufferedReader(fReader); } catch (FileNotFoundException f) { @@ -528,9 +524,9 @@ public class ProcfsBasedProcessTree extends ProcessTree { boolean mat = m.find(); if (mat) { // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss) - pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), + pinfo.updateProcessInfo(m.group(2), m.group(3), Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)), - Long.parseLong(m.group(7)), Long.parseLong(m.group(8)), + Long.parseLong(m.group(7)), new BigInteger(m.group(8)), Long.parseLong(m.group(10)), Long.parseLong(m.group(11))); } else { LOG.warn("Unexpected: procfs stat file is not in the expected format" @@ -562,7 +558,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { */ public String toString() { StringBuffer pTree = new StringBuffer("[ "); - for (Integer p : processTree.keySet()) { + for (String p : processTree.keySet()) { pTree.append(p); pTree.append(" "); } @@ -575,15 +571,16 @@ public class ProcfsBasedProcessTree extends ProcessTree { * */ private static class ProcessInfo { - private Integer pid; // process-id + private String pid; // process-id private String name; // command name private Integer pgrpId; // process group-id - private Integer ppid; // parent process-id + private String ppid; // parent process-id private Integer sessionId; // session-id private Long vmem; // virtual memory usage private Long rssmemPage; // rss memory usage in # of pages private Long utime = 0L; // # of jiffies in user mode - private Long stime = 0L; // # of jiffies in kernel mode + private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE); + private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode // how many times has this process been seen alive private int age; @@ -595,13 +592,13 @@ public class ProcfsBasedProcessTree extends ProcessTree { private List children = new ArrayList(); // list of children - public ProcessInfo(int pid) { - this.pid = Integer.valueOf(pid); + public ProcessInfo(String pid) { + this.pid = pid; // seeing this the first time. this.age = 1; } - public Integer getPid() { + public String getPid() { return pid; } @@ -613,7 +610,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { return pgrpId; } - public Integer getPpid() { + public String getPpid() { return ppid; } @@ -629,7 +626,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { return utime; } - public Long getStime() { + public BigInteger getStime() { return stime; } @@ -652,8 +649,8 @@ public class ProcfsBasedProcessTree extends ProcessTree { return false; } - public void updateProcessInfo(String name, Integer ppid, Integer pgrpId, - Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) { + public void updateProcessInfo(String name, String ppid, Integer pgrpId, + Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) { this.name = name; this.ppid = ppid; this.pgrpId = pgrpId; @@ -665,8 +662,19 @@ public class ProcfsBasedProcessTree extends ProcessTree { } public void updateJiffy(ProcessInfo oldInfo) { - this.dtime = (oldInfo == null ? this.utime + this.stime - : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime)); + if (oldInfo == null) { + BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime)); + if (sum.compareTo(MAX_LONG) > 0) { + this.dtime = 0L; + LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime + + ") is greater than " + Long.MAX_VALUE); + } else { + this.dtime = sum.longValue(); + } + return; + } + this.dtime = (this.utime - oldInfo.utime + + this.stime.subtract(oldInfo.stime).longValue()); } public void updateAge(ProcessInfo oldInfo) { @@ -690,7 +698,7 @@ public class ProcfsBasedProcessTree extends ProcessTree { FileReader fReader = null; try { fReader = - new FileReader(new File(new File(procfsDir, pid.toString()), + new FileReader(new File(new File(procfsDir, pid), PROCFS_CMDLINE_FILE)); } catch (FileNotFoundException f) { // The process vanished in the interim! diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index c1626f9531e..b420e3dfbe3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.service.CompositeService; /****************************************************************** @@ -51,6 +52,9 @@ public class JobHistoryServer extends CompositeService { @Override public synchronized void init(Configuration conf) { Configuration config = new YarnConfiguration(conf); + + config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + try { doSecureLogin(conf); } catch(IOException ie) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index ec2fe5ea3ae..58ab9c8117e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -53,7 +53,6 @@ public class TestJobHistoryEvents { @Test public void testHistoryEvents() throws Exception { Configuration conf = new Configuration(); - conf.set(MRJobConfig.USER_NAME, "test"); MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true); app.submit(conf); Job job = app.getContext().getAllJobs().values().iterator().next(); @@ -102,7 +101,6 @@ public class TestJobHistoryEvents { public void testEventsFlushOnStop() throws Exception { Configuration conf = new Configuration(); - conf.set(MRJobConfig.USER_NAME, "test"); MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this .getClass().getName(), true); app.submit(conf); diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java similarity index 55% rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java index 3635892d639..6139fdb05f1 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java @@ -22,10 +22,8 @@ import java.io.DataOutputStream; import java.io.File; import java.io.IOException; -import junit.extensions.TestSetup; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,64 +31,71 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; /** * A JUnit test to test Map-Reduce job cleanup. */ -public class TestJobCleanup extends TestCase { - private static String TEST_ROOT_DIR = - new File(System.getProperty("test.build.data", "/tmp") + "/" - + "test-job-cleanup").toString(); - private static final String CUSTOM_CLEANUP_FILE_NAME = - "_custom_cleanup"; - private static final String ABORT_KILLED_FILE_NAME = - "_custom_abort_killed"; - private static final String ABORT_FAILED_FILE_NAME = - "_custom_abort_failed"; +@SuppressWarnings("deprecation") +public class TestJobCleanup { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp") + "/" + "test-job-cleanup").toString(); + private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup"; + private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed"; + private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed"; private static FileSystem fileSys = null; private static MiniMRCluster mr = null; private static Path inDir = null; private static Path emptyInDir = null; private static int outDirs = 0; - - public static Test suite() { - TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) { - protected void setUp() throws Exception { - JobConf conf = new JobConf(); - fileSys = FileSystem.get(conf); - fileSys.delete(new Path(TEST_ROOT_DIR), true); - conf.set("mapred.job.tracker.handler.count", "1"); - conf.set("mapred.job.tracker", "127.0.0.1:0"); - conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); - conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); - mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); - inDir = new Path(TEST_ROOT_DIR, "test-input"); - String input = "The quick brown fox\n" + "has many silly\n" - + "red fox sox\n"; - DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); - file.writeBytes(input); - file.close(); - emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); - fileSys.mkdirs(emptyInDir); - } - - protected void tearDown() throws Exception { - if (fileSys != null) { - fileSys.delete(new Path(TEST_ROOT_DIR), true); - fileSys.close(); - } - if (mr != null) { - mr.shutdown(); - } - } - }; - return setup; + private static Log LOG = LogFactory.getLog(TestJobCleanup.class); + + @BeforeClass + public static void setUp() throws IOException { + JobConf conf = new JobConf(); + fileSys = FileSystem.get(conf); + fileSys.delete(new Path(TEST_ROOT_DIR), true); + conf.set("mapred.job.tracker.handler.count", "1"); + conf.set("mapred.job.tracker", "127.0.0.1:0"); + conf.set("mapred.job.tracker.http.address", "127.0.0.1:0"); + conf.set("mapred.task.tracker.http.address", "127.0.0.1:0"); + conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR + + "/intermediate"); + conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true"); + + mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); + inDir = new Path(TEST_ROOT_DIR, "test-input"); + String input = "The quick brown fox\n" + "has many silly\n" + + "red fox sox\n"; + DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0)); + file.writeBytes(input); + file.close(); + emptyInDir = new Path(TEST_ROOT_DIR, "empty-input"); + fileSys.mkdirs(emptyInDir); } - - /** - * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)} - * making a _failed/_killed in the output folder + + @AfterClass + public static void tearDown() throws Exception { + if (fileSys != null) { + // fileSys.delete(new Path(TEST_ROOT_DIR), true); + fileSys.close(); + } + if (mr != null) { + mr.shutdown(); + } + } + + /** + * Committer with deprecated + * {@link FileOutputCommitter#cleanupJob(JobContext)} making a _failed/_killed + * in the output folder */ static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter { @Override @@ -101,31 +106,40 @@ public class TestJobCleanup extends TestCase { FileSystem fs = outputPath.getFileSystem(conf); fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close(); } + + @Override + public void commitJob(JobContext context) throws IOException { + cleanupJob(context); + } + + @Override + public void abortJob(JobContext context, int i) throws IOException { + cleanupJob(context); + } } - - /** + + /** * Committer with abort making a _failed/_killed in the output folder */ static class CommitterWithCustomAbort extends FileOutputCommitter { @Override - public void abortJob(JobContext context, int state) - throws IOException { - JobConf conf = context.getJobConf();; + public void abortJob(JobContext context, int state) throws IOException { + JobConf conf = context.getJobConf(); + ; Path outputPath = FileOutputFormat.getOutputPath(conf); FileSystem fs = outputPath.getFileSystem(conf); - String fileName = (state == JobStatus.FAILED) - ? TestJobCleanup.ABORT_FAILED_FILE_NAME - : TestJobCleanup.ABORT_KILLED_FILE_NAME; + String fileName = (state == JobStatus.FAILED) ? TestJobCleanup.ABORT_FAILED_FILE_NAME + : TestJobCleanup.ABORT_KILLED_FILE_NAME; fs.create(new Path(outputPath, fileName)).close(); } } - + private Path getNewOutputDir() { return new Path(TEST_ROOT_DIR, "output-" + outDirs++); } - - private void configureJob(JobConf jc, String jobName, int maps, int reds, - Path outDir) { + + private void configureJob(JobConf jc, String jobName, int maps, int reds, + Path outDir) { jc.setJobName(jobName); jc.setInputFormat(TextInputFormat.class); jc.setOutputKeyClass(LongWritable.class); @@ -137,36 +151,38 @@ public class TestJobCleanup extends TestCase { jc.setNumMapTasks(maps); jc.setNumReduceTasks(reds); } - + // run a job with 1 map and let it run to completion - private void testSuccessfulJob(String filename, - Class committer, String[] exclude) - throws IOException { + private void testSuccessfulJob(String filename, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "job with cleanup()", 1, 0, outDir); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + + LOG.info("Job finished : " + job.isComplete()); Path testFile = new Path(outDir, filename); - assertTrue("Done file missing for job " + id, fileSys.exists(testFile)); - + assertTrue("Done file \"" + testFile + "\" missing for job " + id, + fileSys.exists(testFile)); + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); - assertFalse("File " + file + " should not be present for successful job " - + id, fileSys.exists(file)); + assertFalse("File " + file + " should not be present for successful job " + + id, fileSys.exists(file)); } } - + // run a job for which all the attempts simply fail. - private void testFailedJob(String fileName, - Class committer, String[] exclude) - throws IOException { + private void testFailedJob(String fileName, + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "fail job with abort()", 1, 0, outDir); @@ -179,128 +195,129 @@ public class TestJobCleanup extends TestCase { RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); job.waitForCompletion(); - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for failed job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for failed job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for failed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + // run a job which gets stuck in mapper and kill it. private void testKilledJob(String fileName, - Class committer, String[] exclude) - throws IOException { + Class committer, String[] exclude) + throws IOException { JobConf jc = mr.createJobConf(); Path outDir = getNewOutputDir(); configureJob(jc, "kill job with abort()", 1, 0, outDir); // set the job to wait for long jc.setMapperClass(UtilsForTests.KillMapper.class); jc.setOutputCommitter(committer); - + JobClient jobClient = new JobClient(jc); RunningJob job = jobClient.submitJob(jc); JobID id = job.getID(); - JobInProgress jip = - mr.getJobTrackerRunner().getJobTracker().getJob(job.getID()); - + + Counters counters = job.getCounters(); + // wait for the map to be launched while (true) { - if (jip.runningMaps() == 1) { + if (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) == 1) { break; } + LOG.info("Waiting for a map task to be launched"); UtilsForTests.waitFor(100); + counters = job.getCounters(); } - + job.killJob(); // kill the job - + job.waitForCompletion(); // wait for the job to complete - + if (fileName != null) { Path testFile = new Path(outDir, fileName); - assertTrue("File " + testFile + " missing for job " + id, - fileSys.exists(testFile)); + assertTrue("File " + testFile + " missing for job " + id, + fileSys.exists(testFile)); } - + // check if the files from the missing set exists for (String ex : exclude) { Path file = new Path(outDir, ex); assertFalse("File " + file + " should not be present for killed job " - + id, fileSys.exists(file)); + + id, fileSys.exists(file)); } } - + /** * Test default cleanup/abort behavior - * + * * @throws IOException */ + @Test public void testDefaultCleanupAndAbort() throws IOException { // check with a successful job testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - FileOutputCommitter.class, - new String[] {}); - + FileOutputCommitter.class, new String[] {}); + // check with a failed job - testFailedJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - + testFailedJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); + // check default abort job kill - testKilledJob(null, - FileOutputCommitter.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testKilledJob(null, FileOutputCommitter.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME }); } - + /** * Test if a failed job with custom committer runs the abort code. - * + * * @throws IOException */ + @Test public void testCustomAbort() throws IOException { // check with a successful job - testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, - CommitterWithCustomAbort.class, - new String[] {ABORT_FAILED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME, + CommitterWithCustomAbort.class, new String[] { ABORT_FAILED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a failed job - testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_KILLED_FILE_NAME}); - + testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_KILLED_FILE_NAME }); + // check with a killed job - testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME, - ABORT_FAILED_FILE_NAME}); + testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class, + new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME, + ABORT_FAILED_FILE_NAME }); } /** * Test if a failed job with custom committer runs the deprecated - * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api + * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api * compatibility testing. */ + @Test public void testCustomCleanup() throws IOException { // check with a successful job - testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {}); - - // check with a failed job - testFailedJob(CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); - - // check with a killed job - testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, - CommitterWithCustomDeprecatedCleanup.class, - new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {}); + + // check with a failed job + testFailedJob(CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); + + // check with a killed job + testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME, + CommitterWithCustomDeprecatedCleanup.class, + new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME}); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 4b17c1e943e..799e20092d4 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -193,6 +193,12 @@ + + + + + + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index ffa2e9cfc60..1b3a76a4778 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.AbstractService; @@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private boolean exitOnDispatchException; public AsyncDispatcher() { - this(new HashMap, EventHandler>(), - new LinkedBlockingQueue(), true); - } - - public AsyncDispatcher(boolean exitOnException) { - this(new HashMap, EventHandler>(), - new LinkedBlockingQueue(), exitOnException); + this(new LinkedBlockingQueue()); } - AsyncDispatcher( - Map, EventHandler> eventDispatchers, - BlockingQueue eventQueue, boolean exitOnException) { + public AsyncDispatcher(BlockingQueue eventQueue) { super("Dispatcher"); this.eventQueue = eventQueue; - this.eventDispatchers = eventDispatchers; - this.exitOnDispatchException = exitOnException; + this.eventDispatchers = new HashMap, EventHandler>(); } Runnable createThread() { @@ -86,6 +78,14 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { }; } + @Override + public synchronized void init(Configuration conf) { + this.exitOnDispatchException = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.init(conf); + } + @Override public void start() { //start all the components @@ -103,7 +103,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { eventHandlingThread.join(); } catch (InterruptedException ie) { - LOG.debug("Interrupted Exception while stopping", ie); + LOG.warn("Interrupted Exception while stopping", ie); } } @@ -126,8 +126,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } catch (Throwable t) { //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread. Exiting..", t); + LOG.fatal("Error in dispatcher thread", t); if (exitOnDispatchException) { + LOG.info("Exiting, bbye.."); System.exit(-1); } } @@ -177,6 +178,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { try { eventQueue.put(event); } catch (InterruptedException e) { + LOG.warn("AsyncDispatcher thread interrupted", e); throw new YarnException(e); } }; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java index 40c07755a98..f87f1b26c78 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java @@ -23,8 +23,18 @@ package org.apache.hadoop.yarn.event; * event handlers based on event types. * */ +@SuppressWarnings("rawtypes") public interface Dispatcher { + // Configuration to make sure dispatcher crashes but doesn't do system-exit in + // case of errors. By default, it should be false, so that tests are not + // affected. For all daemons it should be explicitly set to true so that + // daemons can crash instead of hanging around. + public static final String DISPATCHER_EXIT_ON_ERROR_KEY = + "yarn.dispatcher.exit-on-error"; + + public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false; + EventHandler getEventHandler(); void register(Class eventType, EventHandler handler); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java index db5f532987f..db65ad20cb8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; @@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree { // to a test directory. private String procfsDir; - protected final Integer pid; + static private String deadPid = "-1"; + private String pid = deadPid; + static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*"); private Long cpuTime = 0L; private boolean setsidUsed = false; - protected Map processTree = - new HashMap(); + protected Map processTree = + new HashMap(); public ProcfsBasedProcessTree(String pid) { this(pid, false); @@ -150,19 +153,19 @@ public class ProcfsBasedProcessTree { * @return the process-tree with latest state. */ public ProcfsBasedProcessTree getProcessTree() { - if (pid != -1) { + if (!pid.equals(deadPid)) { // Get the list of processes - List processList = getProcessList(); + List processList = getProcessList(); - Map allProcessInfo = new HashMap(); + Map allProcessInfo = new HashMap(); // cache the processTree to get the age for processes - Map oldProcs = - new HashMap(processTree); + Map oldProcs = + new HashMap(processTree); processTree.clear(); ProcessInfo me = null; - for (Integer proc : processList) { + for (String proc : processList) { // Get information for each process ProcessInfo pInfo = new ProcessInfo(proc); if (constructProcessInfo(pInfo, procfsDir) != null) { @@ -179,9 +182,9 @@ public class ProcfsBasedProcessTree { } // Add each process to its parent. - for (Map.Entry entry : allProcessInfo.entrySet()) { - Integer pID = entry.getKey(); - if (pID != 1) { + for (Map.Entry entry : allProcessInfo.entrySet()) { + String pID = entry.getKey(); + if (!pID.equals("1")) { ProcessInfo pInfo = entry.getValue(); ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid()); if (parentPInfo != null) { @@ -202,7 +205,7 @@ public class ProcfsBasedProcessTree { } // update age values and compute the number of jiffies since last update - for (Map.Entry procs : processTree.entrySet()) { + for (Map.Entry procs : processTree.entrySet()) { ProcessInfo oldInfo = oldProcs.get(procs.getKey()); if (procs.getValue() != null) { procs.getValue().updateJiffy(oldInfo); @@ -227,20 +230,22 @@ public class ProcfsBasedProcessTree { return checkPidPgrpidForMatch(pid, PROCFS); } - public static boolean checkPidPgrpidForMatch(int _pid, String procfs) { + public static boolean checkPidPgrpidForMatch(String _pid, String procfs) { // Get information for this process ProcessInfo pInfo = new ProcessInfo(_pid); pInfo = constructProcessInfo(pInfo, procfs); // null if process group leader finished execution; issue no warning // make sure that pid and its pgrpId match - return pInfo == null || pInfo.getPgrpId().equals(_pid); + if (pInfo == null) return true; + String pgrpId = pInfo.getPgrpId().toString(); + return pgrpId.equals(_pid); } private static final String PROCESSTREE_DUMP_FORMAT = - "\t|- %d %d %d %d %s %d %d %d %d %s\n"; + "\t|- %s %s %d %d %s %d %d %d %d %s\n"; - public List getCurrentProcessIDs() { - List currentPIDs = new ArrayList(); + public List getCurrentProcessIDs() { + List currentPIDs = new ArrayList(); currentPIDs.addAll(processTree.keySet()); return currentPIDs; } @@ -354,34 +359,27 @@ public class ProcfsBasedProcessTree { return cpuTime; } - private static Integer getValidPID(String pid) { - Integer retPid = -1; - try { - retPid = Integer.parseInt(pid); - if (retPid <= 0) { - retPid = -1; - } - } catch (NumberFormatException nfe) { - retPid = -1; - } - return retPid; + private static String getValidPID(String pid) { + if (pid == null) return deadPid; + Matcher m = numberPattern.matcher(pid); + if (m.matches()) return pid; + return deadPid; } /** * Get the list of all processes in the system. */ - private List getProcessList() { + private List getProcessList() { String[] processDirs = (new File(procfsDir)).list(); - List processList = new ArrayList(); + List processList = new ArrayList(); for (String dir : processDirs) { + Matcher m = numberPattern.matcher(dir); + if (!m.matches()) continue; try { - int pd = Integer.parseInt(dir); if ((new File(procfsDir, dir)).isDirectory()) { - processList.add(Integer.valueOf(pd)); + processList.add(dir); } - } catch (NumberFormatException n) { - // skip this directory } catch (SecurityException s) { // skip this process } @@ -407,7 +405,7 @@ public class ProcfsBasedProcessTree { BufferedReader in = null; FileReader fReader = null; try { - File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid())); + File pidDir = new File(procfsDir, pinfo.getPid()); fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE)); in = new BufferedReader(fReader); } catch (FileNotFoundException f) { @@ -424,9 +422,9 @@ public class ProcfsBasedProcessTree { boolean mat = m.find(); if (mat) { // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss) - pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), + pinfo.updateProcessInfo(m.group(2), m.group(3), Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)), - Long.parseLong(m.group(7)), Long.parseLong(m.group(8)), + Long.parseLong(m.group(7)), new BigInteger(m.group(8)), Long.parseLong(m.group(10)), Long.parseLong(m.group(11))); } else { LOG.warn("Unexpected: procfs stat file is not in the expected format" @@ -458,7 +456,7 @@ public class ProcfsBasedProcessTree { */ public String toString() { StringBuffer pTree = new StringBuffer("[ "); - for (Integer p : processTree.keySet()) { + for (String p : processTree.keySet()) { pTree.append(p); pTree.append(" "); } @@ -471,15 +469,16 @@ public class ProcfsBasedProcessTree { * */ private static class ProcessInfo { - private Integer pid; // process-id + private String pid; // process-id private String name; // command name private Integer pgrpId; // process group-id - private Integer ppid; // parent process-id + private String ppid; // parent process-id private Integer sessionId; // session-id private Long vmem; // virtual memory usage private Long rssmemPage; // rss memory usage in # of pages private Long utime = 0L; // # of jiffies in user mode - private Long stime = 0L; // # of jiffies in kernel mode + private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE); + private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode // how many times has this process been seen alive private int age; @@ -491,13 +490,13 @@ public class ProcfsBasedProcessTree { private List children = new ArrayList(); // list of children - public ProcessInfo(int pid) { - this.pid = Integer.valueOf(pid); + public ProcessInfo(String pid) { + this.pid = pid; // seeing this the first time. this.age = 1; } - public Integer getPid() { + public String getPid() { return pid; } @@ -509,7 +508,7 @@ public class ProcfsBasedProcessTree { return pgrpId; } - public Integer getPpid() { + public String getPpid() { return ppid; } @@ -525,7 +524,7 @@ public class ProcfsBasedProcessTree { return utime; } - public Long getStime() { + public BigInteger getStime() { return stime; } @@ -548,8 +547,8 @@ public class ProcfsBasedProcessTree { return false; } - public void updateProcessInfo(String name, Integer ppid, Integer pgrpId, - Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) { + public void updateProcessInfo(String name, String ppid, Integer pgrpId, + Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) { this.name = name; this.ppid = ppid; this.pgrpId = pgrpId; @@ -559,10 +558,21 @@ public class ProcfsBasedProcessTree { this.vmem = vmem; this.rssmemPage = rssmem; } - + public void updateJiffy(ProcessInfo oldInfo) { - this.dtime = (oldInfo == null ? this.utime + this.stime - : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime)); + if (oldInfo == null) { + BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime)); + if (sum.compareTo(MAX_LONG) > 0) { + this.dtime = 0L; + LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime + + ") is greater than " + Long.MAX_VALUE); + } else { + this.dtime = sum.longValue(); + } + return; + } + this.dtime = (this.utime - oldInfo.utime + + this.stime.subtract(oldInfo.stime).longValue()); } public void updateAge(ProcessInfo oldInfo) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 20d7dfca94c..e79e7b360ef 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.yarn.event; -import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { @@ -36,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher { } private DrainDispatcher(BlockingQueue eventQueue) { - super(new HashMap, EventHandler>(), eventQueue, true); + super(eventQueue); this.queue = eventQueue; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java new file mode 100644 index 00000000000..5907f39d29e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.conf.Configuration; + +/** + * This is a service that can be configured to break on any of the lifecycle + * events, so test the failure handling of other parts of the service + * infrastructure. + * + * It retains a counter to the number of times each entry point is called - + * these counters are incremented before the exceptions are raised and + * before the superclass state methods are invoked. + * + */ + + +public class BreakableService extends AbstractService { + + private boolean failOnInit; + private boolean failOnStart; + private boolean failOnStop; + private int[] counts = new int[4]; + + public BreakableService() { + this(false, false, false); + } + + public BreakableService(boolean failOnInit, + boolean failOnStart, + boolean failOnStop) { + super("BreakableService"); + this.failOnInit = failOnInit; + this.failOnStart = failOnStart; + this.failOnStop = failOnStop; + inc(STATE.NOTINITED); + } + + private int convert(STATE state) { + switch (state) { + case NOTINITED: return 0; + case INITED: return 1; + case STARTED: return 2; + case STOPPED: return 3; + default: return 0; + } + } + + private void inc(STATE state) { + int index = convert(state); + counts[index] ++; + } + + public int getCount(STATE state) { + return counts[convert(state)]; + } + + private void maybeFail(boolean fail, String action) { + if (fail) { + throw new BrokenLifecycleEvent(action); + } + } + + @Override + public void init(Configuration conf) { + inc(STATE.INITED); + maybeFail(failOnInit, "init"); + super.init(conf); + } + + @Override + public void start() { + inc(STATE.STARTED); + maybeFail(failOnStart, "start"); + super.start(); + } + + @Override + public void stop() { + inc(STATE.STOPPED); + maybeFail(failOnStop, "stop"); + super.stop(); + } + + public void setFailOnInit(boolean failOnInit) { + this.failOnInit = failOnInit; + } + + public void setFailOnStart(boolean failOnStart) { + this.failOnStart = failOnStart; + } + + public void setFailOnStop(boolean failOnStop) { + this.failOnStop = failOnStop; + } + + /** + * The exception explicitly raised on a failure + */ + public static class BrokenLifecycleEvent extends RuntimeException { + BrokenLifecycleEvent(String action) { + super("Lifecycle Failure during " + action); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/ServiceAssert.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/ServiceAssert.java new file mode 100644 index 00000000000..7a45aa3985b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/ServiceAssert.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.junit.Assert; + +/** + * A set of assertions about the state of any service + */ +public class ServiceAssert extends Assert { + + public static void assertServiceStateCreated(Service service) { + assertServiceInState(service, Service.STATE.NOTINITED); + } + + public static void assertServiceStateInited(Service service) { + assertServiceInState(service, Service.STATE.INITED); + } + + public static void assertServiceStateStarted(Service service) { + assertServiceInState(service, Service.STATE.STARTED); + } + + public static void assertServiceStateStopped(Service service) { + assertServiceInState(service, Service.STATE.STOPPED); + } + + public static void assertServiceInState(Service service, Service.STATE state) { + assertNotNull("Null service", service); + assertEquals("Service in wrong state: " + service, state, + service.getServiceState()); + } +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java new file mode 100644 index 00000000000..7c9655e5163 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +public class TestServiceLifecycle extends ServiceAssert { + + void assertStateCount(BreakableService service, + Service.STATE state, + int expected) { + int actual = service.getCount(state); + if (expected != actual) { + fail("Expected entry count for state [" + state +"] of " + service + + " to be " + expected + " but was " + actual); + } + } + + + @Test + public void testWalkthrough() throws Throwable { + + BreakableService svc = new BreakableService(); + assertServiceStateCreated(svc); + assertStateCount(svc, Service.STATE.NOTINITED, 1); + assertStateCount(svc, Service.STATE.INITED, 0); + assertStateCount(svc, Service.STATE.STARTED, 0); + assertStateCount(svc, Service.STATE.STOPPED, 0); + svc.init(new Configuration()); + assertServiceStateInited(svc); + assertStateCount(svc, Service.STATE.INITED, 1); + svc.start(); + assertServiceStateStarted(svc); + assertStateCount(svc, Service.STATE.STARTED, 1); + svc.stop(); + assertServiceStateStopped(svc); + assertStateCount(svc, Service.STATE.STOPPED, 1); + } + + /** + * call init twice + * @throws Throwable + */ + @Test + public void testInitTwice() throws Throwable { + BreakableService svc = new BreakableService(); + svc.init(new Configuration()); + try { + svc.init(new Configuration()); + fail("Expected a failure, got " + svc); + } catch (IllegalStateException e) { + //expected + } + assertStateCount(svc, Service.STATE.INITED, 2); + } + + /** + * call start twice + * @throws Throwable + */ + @Test + public void testStartTwice() throws Throwable { + BreakableService svc = new BreakableService(); + svc.init(new Configuration()); + svc.start(); + try { + svc.start(); + fail("Expected a failure, got " + svc); + } catch (IllegalStateException e) { + //expected + } + assertStateCount(svc, Service.STATE.STARTED, 2); + } + + + /** + * verify that when a service is stopped more than once, no exception + * is thrown, and the counter is incremented + * this is because the state change operations happen after the counter in + * the subclass is incremented, even though stop is meant to be a no-op + * @throws Throwable + */ + @Test + public void testStopTwice() throws Throwable { + BreakableService svc = new BreakableService(); + svc.init(new Configuration()); + svc.start(); + svc.stop(); + assertStateCount(svc, Service.STATE.STOPPED, 1); + svc.stop(); + assertStateCount(svc, Service.STATE.STOPPED, 2); + } + + + /** + * Show that if the service failed during an init + * operation, it stays in the created state, even after stopping it + * @throws Throwable + */ + + @Test + public void testStopFailedInit() throws Throwable { + BreakableService svc = new BreakableService(true, false, false); + assertServiceStateCreated(svc); + try { + svc.init(new Configuration()); + fail("Expected a failure, got " + svc); + } catch (BreakableService.BrokenLifecycleEvent e) { + //expected + } + //the service state wasn't passed + assertServiceStateCreated(svc); + assertStateCount(svc, Service.STATE.INITED, 1); + //now try to stop + svc.stop(); + //even after the stop operation, we haven't entered the state + assertServiceStateCreated(svc); + } + + + /** + * Show that if the service failed during an init + * operation, it stays in the created state, even after stopping it + * @throws Throwable + */ + + @Test + public void testStopFailedStart() throws Throwable { + BreakableService svc = new BreakableService(false, true, false); + svc.init(new Configuration()); + assertServiceStateInited(svc); + try { + svc.start(); + fail("Expected a failure, got " + svc); + } catch (BreakableService.BrokenLifecycleEvent e) { + //expected + } + //the service state wasn't passed + assertServiceStateInited(svc); + assertStateCount(svc, Service.STATE.INITED, 1); + //now try to stop + svc.stop(); + //even after the stop operation, we haven't entered the state + assertServiceStateInited(svc); + } + + /** + * verify that when a service is stopped more than once, no exception + * is thrown, and the counter is incremented + * this is because the state change operations happen after the counter in + * the subclass is incremented, even though stop is meant to be a no-op + * @throws Throwable + */ + @Test + public void testFailingStop() throws Throwable { + BreakableService svc = new BreakableService(false, false, true); + svc.init(new Configuration()); + svc.start(); + try { + svc.stop(); + fail("Expected a failure, got " + svc); + } catch (BreakableService.BrokenLifecycleEvent e) { + //expected + } + assertStateCount(svc, Service.STATE.STOPPED, 1); + //now try again, and expect it to happen again + try { + svc.stop(); + fail("Expected a failure, got " + svc); + } catch (BreakableService.BrokenLifecycleEvent e) { + //expected + } + assertStateCount(svc, Service.STATE.STOPPED, 2); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java index 454ef2c2038..644089bbda3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java @@ -527,7 +527,7 @@ public class TestProcfsBasedProcessTree { // Let us not create stat file for pid 100. Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch( - Integer.valueOf(pid), procfsRootDir.getAbsolutePath())); + pid, procfsRootDir.getAbsolutePath())); } finally { FileUtil.fullyDelete(procfsRootDir); } @@ -662,8 +662,8 @@ public class TestProcfsBasedProcessTree { */ private static boolean isAnyProcessInTreeAlive( ProcfsBasedProcessTree processTree) { - for (Integer pId : processTree.getCurrentProcessIDs()) { - if (isAlive(pId.toString())) { + for (String pId : processTree.getCurrentProcessIDs()) { + if (isAlive(pId)) { return true; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 840a5db9f49..dea4758a744 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -74,7 +74,7 @@ CFLAGS - -DHADOOP_CONF_DIR=${container-executor.conf.dir} + -DHADOOP_CONF_DIR=${container-executor.conf.dir} -m32 ${project.build.directory}/native/container-executor diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 38eff3591f3..7d4de873e3f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.*; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -85,6 +86,7 @@ public class DeletionService extends AbstractService { sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } + sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setKeepAliveTime(60L, SECONDS); super.init(conf); } @@ -92,14 +94,27 @@ public class DeletionService extends AbstractService { @Override public void stop() { sched.shutdown(); + boolean terminated = false; try { - sched.awaitTermination(10, SECONDS); + terminated = sched.awaitTermination(10, SECONDS); } catch (InterruptedException e) { + } + if (terminated != true) { sched.shutdownNow(); } super.stop(); } + /** + * Determine if the service has completely stopped. + * Used only by unit tests + * @return true if service has completely stopped + */ + @Private + public boolean isTerminated() { + return getServiceState() == STATE.STOPPED && sched.isTerminated(); + } + private class FileDeletion implements Runnable { final String user; final Path subDir; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 439b5e37a57..8f875e117ae 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -99,6 +99,8 @@ public class NodeManager extends CompositeService implements @Override public void init(Configuration conf) { + conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + Context context = new NMContext(); // Create the secretManager if need be. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3169f2f1b8a..1eb22837939 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -137,6 +137,7 @@ public class ContainerManagerImpl extends CompositeService implements this.context = context; this.dirsHandler = dirsHandler; + // ContainerManager level dispatcher. dispatcher = new AsyncDispatcher(); this.deletionService = deletionContext; this.metrics = metrics; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java index abaad224d44..28b51c0632f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java @@ -27,12 +27,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.junit.AfterClass; import org.junit.Test; +import org.mockito.Mockito; + import static org.junit.Assert.*; public class TestDeletionService { @@ -107,12 +110,18 @@ public class TestDeletionService { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, null); } + + int msecToWait = 20 * 1000; + for (Path p : dirs) { + while (msecToWait > 0 && lfs.util().exists(p)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(p)); + } } finally { del.stop(); } - for (Path p : dirs) { - assertFalse(lfs.util().exists(p)); - } } @Test @@ -137,14 +146,35 @@ public class TestDeletionService { del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p, baseDirs.toArray(new Path[4])); } + + int msecToWait = 20 * 1000; + for (Path p : baseDirs) { + for (Path q : content) { + Path fp = new Path(p, q); + while (msecToWait > 0 && lfs.util().exists(fp)) { + Thread.sleep(100); + msecToWait -= 100; + } + assertFalse(lfs.util().exists(fp)); + } + } } finally { del.stop(); } - for (Path p : baseDirs) { - for (Path q : content) { - assertFalse(lfs.util().exists(new Path(p, q))); - } - } } + @Test + public void testStopWithDelayedTasks() throws Exception { + DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class)); + Configuration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60); + del.init(conf); + del.start(); + try { + del.delete("dingo", new Path("/does/not/exist")); + } finally { + del.stop(); + } + assertTrue(del.isTerminated()); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 236489b7f4c..4b3736024b1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -376,7 +376,7 @@ public class TestApplication { WrappedApplication(int id, long timestamp, String user, int numContainers) { dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); localizerBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index e4b7aa47a7a..bc6ec196e17 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -517,7 +517,7 @@ public class TestContainer { WrappedContainer(int appId, long timestamp, int id, String user, boolean withLocalRes, boolean withServiceData) { dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); localizerBus = mock(EventHandler.class); launcherBus = mock(EventHandler.class); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java index a1c6bb84793..07d8df1db6c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; - import org.junit.Test; -import static org.junit.Assert.*; - import org.mockito.ArgumentMatcher; -import static org.mockito.Mockito.*; public class TestLocalizedResource { @@ -62,7 +66,7 @@ public class TestLocalizedResource { @SuppressWarnings("unchecked") // mocked generic public void testNotification() throws Exception { DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); try { dispatcher.start(); EventHandler containerBus = mock(EventHandler.class); @@ -175,7 +179,7 @@ public class TestLocalizedResource { @Test public void testDirectLocalization() throws Exception { DrainDispatcher dispatcher = new DrainDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); try { dispatcher.start(); LocalResource apiRsrc = createMockResource(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 9886d37c73b..84cd7f22b22 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -18,8 +18,23 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.net.InetSocketAddress; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyShort; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,7 +48,6 @@ import java.util.Set; import junit.framework.Assert; -import org.apache.hadoop.ipc.Server; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,6 +56,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; -import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; - import org.junit.Test; -import static org.junit.Assert.*; - import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import static org.mockito.Mockito.*; public class TestResourceLocalizationService { @@ -98,11 +108,11 @@ public class TestResourceLocalizationService { public void testLocalizationInit() throws Exception { final Configuration conf = new Configuration(); AsyncDispatcher dispatcher = new AsyncDispatcher(); - dispatcher.init(null); + dispatcher.init(new Configuration()); ContainerExecutor exec = mock(ContainerExecutor.class); DeletionService delService = spy(new DeletionService(exec)); - delService.init(null); + delService.init(new Configuration()); delService.start(); AbstractFileSystem spylfs = @@ -371,7 +381,7 @@ public class TestResourceLocalizationService { DeletionService delServiceReal = new DeletionService(exec); DeletionService delService = spy(delServiceReal); - delService.init(null); + delService.init(new Configuration()); delService.start(); ResourceLocalizationService rawService = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index dc7d29cdcbc..bf01cef0058 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -131,6 +131,8 @@ public class ResourceManager extends CompositeService implements Recoverable { this.conf = conf; + this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + this.rmDispatcher = createDispatcher(); addIfService(this.rmDispatcher); @@ -265,6 +267,7 @@ public class ResourceManager extends CompositeService implements Recoverable { private final BlockingQueue eventQueue = new LinkedBlockingQueue(); private final Thread eventProcessor; + private volatile boolean stopped = false; public SchedulerEventDispatcher(ResourceScheduler scheduler) { super(SchedulerEventDispatcher.class.getName()); @@ -285,7 +288,7 @@ public class ResourceManager extends CompositeService implements Recoverable { SchedulerEvent event; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { @@ -296,9 +299,13 @@ public class ResourceManager extends CompositeService implements Recoverable { try { scheduler.handle(event); } catch (Throwable t) { - LOG.error("Error in handling event type " + event.getType() + LOG.fatal("Error in handling event type " + event.getType() + " to the scheduler", t); - return; // TODO: Kill RM. + if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } } } } @@ -306,6 +313,7 @@ public class ResourceManager extends CompositeService implements Recoverable { @Override public synchronized void stop() { + this.stopped = true; this.eventProcessor.interrupt(); try { this.eventProcessor.join();