Merge r1245750 through r1291971 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1291974 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-21 19:56:04 +00:00
commit 0e57409872
52 changed files with 1397 additions and 520 deletions

View File

@ -90,7 +90,6 @@ Release 0.23.2 - UNRELEASED
(sharad, todd via todd)
BUG FIXES
HADOOP-8054 NPE with FilterFileSystem (Daryn Sharp via bobby)
HADOOP-8042 When copying a file out of HDFS, modifying it, and uploading
it back into HDFS, the put fails due to a CRC mismatch
@ -117,7 +116,21 @@ Release 0.23.2 - UNRELEASED
HADOOP-8083 javadoc generation for some modules is not done under target/ (tucu)
Release 0.23.1 - 2012-02-08
HADOOP-8036. TestViewFsTrash assumes the user's home directory is
2 levels deep. (Colin Patrick McCabe via eli)
HADOOP-8046 Revert StaticMapping semantics to the existing ones, add DNS
mapping diagnostics in progress (stevel)
HADOOP-8057 hadoop-setup-conf.sh not working because of some extra spaces.
(Vinayakumar B via stevel)
HADOOP-7680 TestHardLink fails on Mac OS X, when gnu stat is in path.
(Milind Bhandarkar via stevel)
HADOOP-8050. Deadlock in metrics. (Kihwal Lee via mattf)
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES
@ -340,6 +353,8 @@ Release 0.23.1 - 2012-02-08
HADOOP-8013. ViewFileSystem does not honor setVerifyChecksum
(Daryn Sharp via bobby)
HADOOP-8054 NPE with FilterFileSystem (Daryn Sharp via bobby)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -64,7 +64,7 @@ public class HardLink {
//override getLinkCountCommand for the particular Unix variant
//Linux is already set as the default - {"stat","-c%h", null}
if (osType == OSType.OS_TYPE_MAC) {
String[] linkCountCmdTemplate = {"stat","-f%l", null};
String[] linkCountCmdTemplate = {"/usr/bin/stat","-f%l", null};
HardLinkCGUnix.setLinkCountCmdTemplate(linkCountCmdTemplate);
} else if (osType == OSType.OS_TYPE_SOLARIS) {
String[] linkCountCmdTemplate = {"ls","-l", null};

View File

@ -94,17 +94,19 @@ class MetricsSourceAdapter implements DynamicMBean {
}
@Override
public synchronized Object getAttribute(String attribute)
public Object getAttribute(String attribute)
throws AttributeNotFoundException, MBeanException, ReflectionException {
updateJmxCache();
Attribute a = attrCache.get(attribute);
if (a == null) {
throw new AttributeNotFoundException(attribute +" not found");
synchronized(this) {
Attribute a = attrCache.get(attribute);
if (a == null) {
throw new AttributeNotFoundException(attribute +" not found");
}
if (LOG.isDebugEnabled()) {
LOG.debug(attribute +": "+ a);
}
return a.getValue();
}
if (LOG.isDebugEnabled()) {
LOG.debug(attribute +": "+ a);
}
return a.getValue();
}
@Override
@ -115,17 +117,19 @@ class MetricsSourceAdapter implements DynamicMBean {
}
@Override
public synchronized AttributeList getAttributes(String[] attributes) {
public AttributeList getAttributes(String[] attributes) {
updateJmxCache();
AttributeList ret = new AttributeList();
for (String key : attributes) {
Attribute attr = attrCache.get(key);
if (LOG.isDebugEnabled()) {
LOG.debug(key +": "+ attr);
synchronized(this) {
AttributeList ret = new AttributeList();
for (String key : attributes) {
Attribute attr = attrCache.get(key);
if (LOG.isDebugEnabled()) {
LOG.debug(key +": "+ attr);
}
ret.add(attr);
}
ret.add(attr);
return ret;
}
return ret;
}
@Override
@ -140,17 +144,32 @@ class MetricsSourceAdapter implements DynamicMBean {
}
@Override
public synchronized MBeanInfo getMBeanInfo() {
public MBeanInfo getMBeanInfo() {
updateJmxCache();
return infoCache;
}
private synchronized void updateJmxCache() {
if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
if (lastRecs == null) {
MetricsCollectorImpl builder = new MetricsCollectorImpl();
getMetrics(builder, true);
private void updateJmxCache() {
boolean getAllMetrics = false;
synchronized(this) {
if (System.currentTimeMillis() - jmxCacheTS >= jmxCacheTTL) {
// temporarilly advance the expiry while updating the cache
jmxCacheTS = System.currentTimeMillis() + jmxCacheTTL;
if (lastRecs == null) {
getAllMetrics = true;
}
}
else {
return;
}
}
if (getAllMetrics) {
MetricsCollectorImpl builder = new MetricsCollectorImpl();
getMetrics(builder, true);
}
synchronized(this) {
int oldCacheSize = attrCache.size();
int newCacheSize = updateAttrCache();
if (oldCacheSize < newCacheSize) {

View File

@ -22,6 +22,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* This is a base class for DNS to Switch mappings. <p/> It is not mandatory to
@ -89,6 +95,49 @@ public abstract class AbstractDNSToSwitchMapping
return false;
}
/**
* Get a copy of the map (for diagnostics)
* @return a clone of the map or null for none known
*/
public Map<String, String> getSwitchMap() {
return null;
}
/**
* Generate a string listing the switch mapping implementation,
* the mapping for every known node and the number of nodes and
* unique switches known about -each entry to a separate line.
* @return a string that can be presented to the ops team or used in
* debug messages.
*/
public String dumpTopology() {
Map<String, String> rack = getSwitchMap();
StringBuilder builder = new StringBuilder();
builder.append("Mapping: ").append(toString()).append("\n");
if (rack != null) {
builder.append("Map:\n");
Set<String> switches = new HashSet<String>();
for (Map.Entry<String, String> entry : rack.entrySet()) {
builder.append(" ")
.append(entry.getKey())
.append(" -> ")
.append(entry.getValue())
.append("\n");
switches.add(entry.getValue());
}
builder.append("Nodes: ").append(rack.size()).append("\n");
builder.append("Switches: ").append(switches.size()).append("\n");
} else {
builder.append("No topology information");
}
return builder.toString();
}
protected boolean isSingleSwitchByScriptPolicy() {
return conf != null
&& conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null;
}
/**
* Query for a {@link DNSToSwitchMapping} instance being on a single
* switch.
@ -100,7 +149,7 @@ public abstract class AbstractDNSToSwitchMapping
* is not derived from this class.
*/
public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) {
return mapping instanceof AbstractDNSToSwitchMapping
return mapping != null && mapping instanceof AbstractDNSToSwitchMapping
&& ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.net;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -123,6 +124,22 @@ public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
}
/**
* Get the (host x switch) map.
* @return a copy of the cached map of hosts to rack
*/
@Override
public Map<String, String> getSwitchMap() {
Map<String, String > switchMap = new HashMap<String, String>(cache);
return switchMap;
}
@Override
public String toString() {
return "cached switch mapping relaying to " + rawMapping;
}
/**
* Delegate the switch topology query to the raw mapping, via
* {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)}

View File

@ -66,9 +66,15 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY ;
/**
* key to the argument count that the script supports
* {@value}
*/
static final String SCRIPT_ARG_COUNT_KEY =
CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY ;
/**
* Text used in the {@link #toString()} method if there is no string
* {@value}
*/
public static final String NO_SCRIPT = "no script";
/**
* Create an instance with the default configuration.
@ -104,6 +110,11 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
return getRawMapping().getConf();
}
@Override
public String toString() {
return "script-based mapping with " + getRawMapping().toString();
}
/**
* {@inheritDoc}
* <p/>
@ -231,7 +242,7 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
s.execute();
allOutput.append(s.getOutput()).append(" ");
} catch (Exception e) {
LOG.warn("Exception: ", e);
LOG.warn("Exception running " + s, e);
return null;
}
loopCount++;
@ -248,5 +259,10 @@ public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
public boolean isSingleSwitch() {
return scriptName == null;
}
@Override
public String toString() {
return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
}
}
}

View File

@ -246,7 +246,7 @@ OPTS=$(getopt \
-l 'dfs-datanode-dir-perm:' \
-l 'dfs-block-local-path-access-user:' \
-l 'dfs-client-read-shortcircuit:' \
-l 'dfs-client-read-shortcircuit-skip-checksum:' \
-l 'dfs-client-read-shortcircuit-skip-checksum:' \
-o 'h' \
-- "$@")

View File

@ -37,15 +37,15 @@ public class TestFSMainOperationsLocalFileSystem extends FSMainOperationsBaseTes
public void setUp() throws Exception {
Configuration conf = new Configuration();
fcTarget = FileSystem.getLocal(conf);
fSys = ViewFileSystemTestSetup.setupForViewFs(
ViewFileSystemTestSetup.configWithViewfsScheme(), fcTarget);
fSys = ViewFileSystemTestSetup.setupForViewFileSystem(
ViewFileSystemTestSetup.createConfig(), fcTarget);
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
ViewFileSystemTestSetup.tearDownForViewFs(fcTarget);
ViewFileSystemTestSetup.tearDown(fcTarget);
}
@Test

View File

@ -40,12 +40,12 @@ public class TestViewFileSystemDelegation { //extends ViewFileSystemTestSetup {
@BeforeClass
public static void setup() throws Exception {
conf = ViewFileSystemTestSetup.configWithViewfsScheme();
conf = ViewFileSystemTestSetup.createConfig();
fs1 = setupFileSystem(new URI("fs1:/"), FakeFileSystem.class);
fs2 = setupFileSystem(new URI("fs2:/"), FakeFileSystem.class);
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
}
static FakeFileSystem setupFileSystem(URI uri, Class clazz)
throws Exception {
String scheme = uri.getScheme();

View File

@ -35,7 +35,6 @@ import org.mortbay.log.Log;
public class TestViewFsTrash {
FileSystem fsTarget; // the target file system - the mount will point here
FileSystem fsView;
Path targetTestRoot;
Configuration conf;
static class TestLFS extends LocalFileSystem {
@ -55,52 +54,19 @@ public class TestViewFsTrash {
@Before
public void setUp() throws Exception {
fsTarget = FileSystem.getLocal(new Configuration());
targetTestRoot = FileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
// In case previous test was killed before cleanup
fsTarget.delete(targetTestRoot, true);
// cleanup trash from previous run if it stuck around
fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"),
true);
fsTarget.mkdirs(targetTestRoot);
fsTarget.mkdirs(new Path(targetTestRoot,"dir1"));
// Now we use the mount fs to set links to user and dir
// in the test root
// Set up the defaultMT in the config with our mount point links
conf = ViewFileSystemTestSetup.configWithViewfsScheme();
// create a link for home directory so that trash path works
// set up viewfs's home dir root to point to home dir root on target
// But home dir is different on linux, mac etc.
// Figure it out by calling home dir on target
String homeDirRoot = fsTarget.getHomeDirectory()
.getParent().toUri().getPath();
ConfigUtil.addLink(conf, homeDirRoot,
fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
ConfigUtil.setHomeDirConf(conf, homeDirRoot);
Log.info("Home dir base " + homeDirRoot);
fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget);
// set working dir so that relative paths
//fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath()));
fsTarget.mkdirs(new Path(FileSystemTestHelper.
getTestRootPath(fsTarget), "dir1"));
conf = ViewFileSystemTestSetup.createConfig();
fsView = ViewFileSystemTestSetup.setupForViewFileSystem(conf, fsTarget);
conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString());
}
@After
public void tearDown() throws Exception {
fsTarget.delete(targetTestRoot, true);
ViewFileSystemTestSetup.tearDown(fsTarget);
fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"),
true);
}
@Test
public void testTrash() throws IOException {

View File

@ -89,7 +89,7 @@ public class ViewFileSystemBaseTest {
// Set up the defaultMT in the config with our mount point links
//Configuration conf = new Configuration();
conf = ViewFileSystemTestSetup.configWithViewfsScheme();
conf = ViewFileSystemTestSetup.createConfig();
setupMountPoints();
fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
}

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.mortbay.log.Log;
/**
@ -46,32 +47,21 @@ public class ViewFileSystemTestSetup {
* @return return the ViewFS File context to be used for tests
* @throws Exception
*/
static public FileSystem setupForViewFs(Configuration conf, FileSystem fsTarget) throws Exception {
static public FileSystem setupForViewFileSystem(Configuration conf, FileSystem fsTarget) throws Exception {
/**
* create the test root on local_fs - the mount table will point here
*/
Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget);
// In case previous test was killed before cleanup
fsTarget.delete(targetOfTests, true);
fsTarget.mkdirs(targetOfTests);
fsTarget.mkdirs(FileSystemTestHelper.getTestRootPath(fsTarget));
// viewFs://home => fsTarget://home
String homeDirRoot = fsTarget.getHomeDirectory()
.getParent().toUri().getPath();
ConfigUtil.addLink(conf, homeDirRoot,
fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
ConfigUtil.setHomeDirConf(conf, homeDirRoot);
Log.info("Home dir base " + homeDirRoot);
// Now set up a link from viewfs to targetfs for the first component of
// path of testdir. For example, if testdir is /user/<userid>/xx then
// a link from /user to targetfs://user.
String testDir = FileSystemTestHelper.getTestRootPath(fsTarget).toUri().getPath();
int indexOf2ndSlash = testDir.indexOf('/', 1);
String testDirFirstComponent = testDir.substring(0, indexOf2ndSlash);
ConfigUtil.addLink(conf, testDirFirstComponent,
fsTarget.makeQualified(new Path(testDirFirstComponent)).toUri());
FileSystem fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
//System.out.println("SRCOfTests = "+ getTestRootPath(fs, "test"));
//System.out.println("TargetOfTests = "+ targetOfTests.toUri());
return fsView;
}
@ -79,12 +69,12 @@ public class ViewFileSystemTestSetup {
*
* delete the test directory in the target fs
*/
static public void tearDownForViewFs(FileSystem fsTarget) throws Exception {
static public void tearDown(FileSystem fsTarget) throws Exception {
Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget);
fsTarget.delete(targetOfTests, true);
}
public static Configuration configWithViewfsScheme() {
public static Configuration createConfig() {
Configuration conf = new Configuration();
conf.set("fs.viewfs.impl", ViewFileSystem.class.getName());
return conf;

View File

@ -21,8 +21,10 @@ import org.apache.hadoop.conf.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Implements the {@link DNSToSwitchMapping} via static mappings. Used
@ -34,6 +36,10 @@ import java.util.Map;
* When an instance of the class has its {@link #setConf(Configuration)}
* method called, nodes listed in the configuration will be added to the map.
* These do not get removed when the instance is garbage collected.
*
* The switch mapping policy of this class is the same as for the
* {@link ScriptBasedMapping} -the presence of a non-empty topology script.
* The script itself is not used.
*/
public class StaticMapping extends AbstractDNSToSwitchMapping {
@ -109,12 +115,30 @@ public class StaticMapping extends AbstractDNSToSwitchMapping {
}
/**
* Declare that this mapping is always multi-switch
* The switch policy of this mapping is driven by the same policy
* as the Scripted mapping: the presence of the script name in
* the configuration file
* @return false, always
*/
@Override
public boolean isSingleSwitch() {
return false;
return isSingleSwitchByScriptPolicy();
}
/**
* Get a copy of the map (for diagnostics)
* @return a clone of the map or null for none known
*/
@Override
public Map<String, String> getSwitchMap() {
synchronized (nameToRackMap) {
return new HashMap<String, String>(nameToRackMap);
}
}
@Override
public String toString() {
return "static mapping with single switch = " + isSingleSwitch();
}
/**

View File

@ -18,22 +18,27 @@
package org.apache.hadoop.net;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Test the static mapping class.
* Because the map is actually static, this map needs to be reset for every test
*/
public class TestStaticMapping extends Assert {
private static final Log LOG = LogFactory.getLog(TestStaticMapping.class);
/**
* Reset the map then create a new instance of the {@link StaticMapping}
* class
* class with a null configuration
* @return a new instance
*/
private StaticMapping newInstance() {
@ -41,63 +46,195 @@ public class TestStaticMapping extends Assert {
return new StaticMapping();
}
@Test
public void testStaticIsSingleSwitch() throws Throwable {
/**
* Reset the map then create a new instance of the {@link StaticMapping}
* class with the topology script in the configuration set to
* the parameter
* @param script a (never executed) script, can be null
* @return a new instance
*/
private StaticMapping newInstance(String script) {
StaticMapping mapping = newInstance();
assertFalse("Empty maps should not be not single switch",
mapping.isSingleSwitch());
mapping.setConf(createConf(script));
return mapping;
}
/**
* Create a configuration with a specific topology script
* @param script a (never executed) script, can be null
* @return a configuration
*/
private Configuration createConf(String script) {
Configuration conf = new Configuration();
if (script != null) {
conf.set(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
script);
} else {
conf.unset(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY);
}
return conf;
}
private void assertSingleSwitch(DNSToSwitchMapping mapping) {
assertEquals("Expected a single switch mapping "
+ mapping,
true,
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
}
private void assertMultiSwitch(DNSToSwitchMapping mapping) {
assertEquals("Expected a multi switch mapping "
+ mapping,
false,
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
}
protected void assertMapSize(AbstractDNSToSwitchMapping switchMapping, int expectedSize) {
assertEquals(
"Expected two entries in the map " + switchMapping.dumpTopology(),
expectedSize, switchMapping.getSwitchMap().size());
}
private List<String> createQueryList() {
List<String> l1 = new ArrayList<String>(2);
l1.add("n1");
l1.add("unknown");
return l1;
}
@Test
public void testCachingRelaysQueries() throws Throwable {
StaticMapping staticMapping = newInstance();
CachedDNSToSwitchMapping mapping =
new CachedDNSToSwitchMapping(staticMapping);
StaticMapping.addNodeToRack("n1", "r1");
assertFalse("Expected multi switch", mapping.isSingleSwitch());
public void testStaticIsSingleSwitchOnNullScript() throws Throwable {
StaticMapping mapping = newInstance(null);
mapping.setConf(createConf(null));
assertSingleSwitch(mapping);
}
@Test
public void testStaticIsMultiSwitchOnScript() throws Throwable {
StaticMapping mapping = newInstance("ls");
assertMultiSwitch(mapping);
}
@Test
public void testAddResolveNodes() throws Throwable {
StaticMapping mapping = newInstance();
StaticMapping.addNodeToRack("n1", "r1");
List<String> l1 = new ArrayList<String>(2);
l1.add("n1");
l1.add("unknown");
List<String> mappings = mapping.resolve(l1);
assertEquals(2, mappings.size());
assertEquals("r1", mappings.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, mappings.get(1));
assertFalse("Mapping is still single switch", mapping.isSingleSwitch());
StaticMapping.addNodeToRack("n1", "/r1");
List<String> queryList = createQueryList();
List<String> resolved = mapping.resolve(queryList);
assertEquals(2, resolved.size());
assertEquals("/r1", resolved.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, resolved.get(1));
// get the switch map and examine it
Map<String, String> switchMap = mapping.getSwitchMap();
String topology = mapping.dumpTopology();
LOG.info(topology);
assertEquals(topology, 1, switchMap.size());
assertEquals(topology, "/r1", switchMap.get("n1"));
}
/**
* Verify that a configuration string builds a topology
*/
@Test
public void testReadNodesFromConfig() throws Throwable {
StaticMapping mapping = newInstance();
Configuration conf = new Configuration();
conf.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING, "n1=r1,n2=r2");
conf.set(StaticMapping.KEY_HADOOP_CONFIGURED_NODE_MAPPING, "n1=/r1,n2=/r2");
mapping.setConf(conf);
//even though we have inserted elements into the list, because
//it is driven by the script key in the configuration, it still
//thinks that it is single rack
assertSingleSwitch(mapping);
List<String> l1 = new ArrayList<String>(3);
l1.add("n1");
l1.add("unknown");
l1.add("n2");
List<String> mappings = mapping.resolve(l1);
assertEquals(3, mappings.size());
assertEquals("r1", mappings.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, mappings.get(1));
assertEquals("r2", mappings.get(2));
assertFalse("Expected to be multi switch",
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
List<String> resolved = mapping.resolve(l1);
assertEquals(3, resolved.size());
assertEquals("/r1", resolved.get(0));
assertEquals(NetworkTopology.DEFAULT_RACK, resolved.get(1));
assertEquals("/r2", resolved.get(2));
Map<String, String> switchMap = mapping.getSwitchMap();
String topology = mapping.dumpTopology();
LOG.info(topology);
assertEquals(topology, 2, switchMap.size());
assertEquals(topology, "/r1", switchMap.get("n1"));
assertNull(topology, switchMap.get("unknown"));
}
/**
* Verify that if the inner mapping is single-switch, so is the cached one
* @throws Throwable on any problem
*/
@Test
public void testNullConfiguration() throws Throwable {
StaticMapping mapping = newInstance();
mapping.setConf(null);
assertFalse("Null maps are expected to be multi switch",
mapping.isSingleSwitch());
assertFalse("Expected to be multi switch",
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
public void testCachingRelaysSingleSwitchQueries() throws Throwable {
//create a single switch map
StaticMapping staticMapping = newInstance(null);
assertSingleSwitch(staticMapping);
CachedDNSToSwitchMapping cachedMap =
new CachedDNSToSwitchMapping(staticMapping);
LOG.info("Mapping: " + cachedMap + "\n" + cachedMap.dumpTopology());
assertSingleSwitch(cachedMap);
}
/**
* Verify that if the inner mapping is multi-switch, so is the cached one
* @throws Throwable on any problem
*/
@Test
public void testCachingRelaysMultiSwitchQueries() throws Throwable {
StaticMapping staticMapping = newInstance("top");
assertMultiSwitch(staticMapping);
CachedDNSToSwitchMapping cachedMap =
new CachedDNSToSwitchMapping(staticMapping);
LOG.info("Mapping: " + cachedMap + "\n" + cachedMap.dumpTopology());
assertMultiSwitch(cachedMap);
}
/**
* This test verifies that resultion queries get relayed to the inner rack
* @throws Throwable on any problem
*/
@Test
public void testCachingRelaysResolveQueries() throws Throwable {
StaticMapping mapping = newInstance();
mapping.setConf(createConf("top"));
StaticMapping staticMapping = mapping;
CachedDNSToSwitchMapping cachedMap =
new CachedDNSToSwitchMapping(staticMapping);
assertMapSize(cachedMap, 0);
//add a node to the static map
StaticMapping.addNodeToRack("n1", "/r1");
//verify it is there
assertMapSize(staticMapping, 1);
//verify that the cache hasn't picked it up yet
assertMapSize(cachedMap, 0);
//now relay the query
cachedMap.resolve(createQueryList());
//and verify the cache is no longer empty
assertMapSize(cachedMap, 2);
}
/**
* This test verifies that resultion queries get relayed to the inner rack
* @throws Throwable on any problem
*/
@Test
public void testCachingCachesNegativeEntries() throws Throwable {
StaticMapping staticMapping = newInstance();
CachedDNSToSwitchMapping cachedMap =
new CachedDNSToSwitchMapping(staticMapping);
assertMapSize(cachedMap, 0);
assertMapSize(staticMapping, 0);
List<String> resolved = cachedMap.resolve(createQueryList());
//and verify the cache is no longer empty while the static map is
assertMapSize(staticMapping, 0);
assertMapSize(cachedMap, 2);
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.net;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.junit.Assert;
import org.junit.Test;
@ -28,22 +30,87 @@ import java.util.List;
*/
public class TestSwitchMapping extends Assert {
/**
* Verify the switch mapping query handles arbitrary DNSToSwitchMapping
* implementations
*
* @throws Throwable on any problem
*/
@Test
public void testStandaloneClassesAssumedMultiswitch() throws Throwable {
DNSToSwitchMapping mapping = new StandaloneSwitchMapping();
assertFalse("Expected to be multi switch",
assertFalse("Expected to be multi switch " + mapping,
AbstractDNSToSwitchMapping.isMappingSingleSwitch(mapping));
}
/**
* Verify the cached mapper delegates the switch mapping query to the inner
* mapping, which again handles arbitrary DNSToSwitchMapping implementations
*
* @throws Throwable on any problem
*/
@Test
public void testCachingRelays() throws Throwable {
CachedDNSToSwitchMapping mapping =
new CachedDNSToSwitchMapping(new StandaloneSwitchMapping());
assertFalse("Expected to be multi switch",
assertFalse("Expected to be multi switch " + mapping,
mapping.isSingleSwitch());
}
/**
* Verify the cached mapper delegates the switch mapping query to the inner
* mapping, which again handles arbitrary DNSToSwitchMapping implementations
*
* @throws Throwable on any problem
*/
@Test
public void testCachingRelaysStringOperations() throws Throwable {
Configuration conf = new Configuration();
String scriptname = "mappingscript.sh";
conf.set(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
scriptname);
ScriptBasedMapping scriptMapping = new ScriptBasedMapping(conf);
assertTrue("Did not find " + scriptname + " in " + scriptMapping,
scriptMapping.toString().contains(scriptname));
CachedDNSToSwitchMapping mapping =
new CachedDNSToSwitchMapping(scriptMapping);
assertTrue("Did not find " + scriptname + " in " + mapping,
mapping.toString().contains(scriptname));
}
/**
* Verify the cached mapper delegates the switch mapping query to the inner
* mapping, which again handles arbitrary DNSToSwitchMapping implementations
*
* @throws Throwable on any problem
*/
@Test
public void testCachingRelaysStringOperationsToNullScript() throws Throwable {
Configuration conf = new Configuration();
ScriptBasedMapping scriptMapping = new ScriptBasedMapping(conf);
assertTrue("Did not find " + ScriptBasedMapping.NO_SCRIPT
+ " in " + scriptMapping,
scriptMapping.toString().contains(ScriptBasedMapping.NO_SCRIPT));
CachedDNSToSwitchMapping mapping =
new CachedDNSToSwitchMapping(scriptMapping);
assertTrue("Did not find " + ScriptBasedMapping.NO_SCRIPT
+ " in " + mapping,
mapping.toString().contains(ScriptBasedMapping.NO_SCRIPT));
}
@Test
public void testNullMapping() {
assertFalse(AbstractDNSToSwitchMapping.isMappingSingleSwitch(null));
}
/**
* This class does not extend the abstract switch mapping, and verifies that
* the switch mapping logic assumes that this is multi switch
*/
private static class StandaloneSwitchMapping implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {

View File

@ -144,6 +144,9 @@ Release 0.23.2 - UNRELEASED
HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. (Brandon Li
via jitendra)
HDFS-2725. hdfs script usage information is missing the information
about "dfs" command (Prashant Sharma via stevel)
OPTIMIZATIONS
BUG FIXES
@ -169,7 +172,9 @@ Release 0.23.2 - UNRELEASED
HDFS-2938. Recursive delete of a large directory make namenode
unresponsive. (Hari Mankude via suresh)
Release 0.23.1 - 2012-02-08
HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd)
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES

View File

@ -26,6 +26,7 @@ HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
function print_usage(){
echo "Usage: hdfs [--config confdir] COMMAND"
echo " where COMMAND is one of:"
echo " dfs run a filesystem command on the file systems supported in Hadoop."
echo " namenode -format format the DFS filesystem"
echo " secondarynamenode run the DFS secondary namenode"
echo " namenode run the DFS namenode"

View File

@ -145,7 +145,7 @@ public class ExtendedBlock implements Writable {
return false;
}
ExtendedBlock b = (ExtendedBlock)o;
return b.block.equals(block) || b.poolId.equals(poolId);
return b.block.equals(block) && b.poolId.equals(poolId);
}
@Override // Object

View File

@ -73,7 +73,7 @@ public class TestViewFsFileStatusHdfs {
long len = FileSystemTestHelper.createFile(fHdfs, testfilename);
Configuration conf = ViewFileSystemTestSetup.configWithViewfsScheme();
Configuration conf = ViewFileSystemTestSetup.createConfig();
ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + "/tmp"));
FileSystem vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import static org.junit.Assert.*;
import org.junit.Test;
public class TestExtendedBlock {
static final String POOL_A = "blockpool-a";
static final String POOL_B = "blockpool-b";
static final Block BLOCK_1_GS1 = new Block(1L, 100L, 1L);
static final Block BLOCK_1_GS2 = new Block(1L, 100L, 2L);
static final Block BLOCK_2_GS1 = new Block(2L, 100L, 1L);
@Test
public void testEquals() {
// Same block -> equal
assertEquals(
new ExtendedBlock(POOL_A, BLOCK_1_GS1),
new ExtendedBlock(POOL_A, BLOCK_1_GS1));
// Different pools, same block id -> not equal
assertNotEquals(
new ExtendedBlock(POOL_A, BLOCK_1_GS1),
new ExtendedBlock(POOL_B, BLOCK_1_GS1));
// Same pool, different block id -> not equal
assertNotEquals(
new ExtendedBlock(POOL_A, BLOCK_1_GS1),
new ExtendedBlock(POOL_A, BLOCK_2_GS1));
// Same block, different genstamps -> equal
assertEquals(
new ExtendedBlock(POOL_A, BLOCK_1_GS1),
new ExtendedBlock(POOL_A, BLOCK_1_GS2));
}
private static void assertNotEquals(Object a, Object b) {
assertFalse("expected not equal: '" + a + "' and '" + b + "'",
a.equals(b));
}
}

View File

@ -39,10 +39,16 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3854. Fixed and reenabled tests related to MR child JVM's
environmental variables in TestMiniMRChildTask. (Tom White via vinodkv)
MAPREDUCE-3877 Add a test to formalise the current state transitions
of the yarn lifecycle. (stevel)
OPTIMIZATIONS
BUG FIXES
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby)
MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid
JSON. (B Anil Kumar via tgraves)
@ -52,11 +58,16 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3864. Fix cluster setup docs for correct SecondaryNameNode
HTTPS parameters. (todd)
MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking
urls when mutiple jobs are submitted from same client jvm. (Eric Payne via
sseth)
MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to
avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo)
MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when
their EventHandlers get exceptions. (vinodkv)
MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it
to the maven build. (Ravi Prakash via vinodkv)
Release 0.23.1 - 2012-02-08
Release 0.23.1 - 2012-02-17
NEW FEATURES
@ -807,6 +818,12 @@ Release 0.23.1 - 2012-02-08
MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
(Tom White via mahadev)
MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking
urls when mutiple jobs are submitted from same client jvm. (Eric Payne via
sseth)
MAPREDUCE-3880. Changed LCE binary to be 32-bit. (acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -188,6 +188,8 @@ public class MRAppMaster extends CompositeService {
@Override
public void init(final Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
downloadTokensAndSetupUGI(conf);
context = new RunningAppContext(conf);
@ -379,6 +381,7 @@ public class MRAppMaster extends CompositeService {
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
// Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
@ -405,7 +408,6 @@ public class MRAppMaster extends CompositeService {
LOG.info("Calling stop for all the services");
stop();
// Send job-end notification
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}

View File

@ -61,7 +61,8 @@ public class ContainerLauncherEvent
@Override
public String toString() {
return super.toString() + " for taskAttempt " + taskAttemptID;
return super.toString() + " for container " + containerID + " taskAttempt "
+ taskAttemptID;
}
@Override

View File

@ -334,7 +334,6 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.error("Returning, interrupted : " + e);
return;
}
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the

View File

@ -216,26 +216,18 @@ public class RecoveryService extends CompositeService implements Recovery {
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
}
protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
return new RecoveryDispatcher(exitOnException);
}
@SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler;
private final EventHandler handler;
RecoveryDispatcher(boolean exitOnException) {
super(exitOnException);
RecoveryDispatcher() {
super();
actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler);
}
RecoveryDispatcher() {
this(false);
}
@Override
@SuppressWarnings("unchecked")
public void dispatch(Event event) {

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@ -49,6 +51,7 @@ import org.junit.Test;
* Tests the state machine with respect to Job/Task/TaskAttempt failure
* scenarios.
*/
@SuppressWarnings("unchecked")
public class TestFail {
@Test
@ -247,10 +250,17 @@ public class TestFail {
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) {
@Override
public void startRpcServer(){};
@Override
public void stopRpcServer(){};
@Override
public InetSocketAddress getAddress() {
return NetUtils.createSocketAddr("localhost", 1234);
}
public void init(Configuration conf) {
conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
super.init(conf);
}
};

View File

@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@ -724,13 +719,6 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
@Override
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryServiceWithCustomDispatcher(
appContext.getApplicationAttemptId(), appContext.getClock(),
getCommitter());
}
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() {
@ -757,21 +745,6 @@ public class TestRecovery {
}
}
static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
public RecoveryServiceWithCustomDispatcher(
ApplicationAttemptId applicationAttemptId, Clock clock,
OutputCommitter committer) {
super(applicationAttemptId, clock, committer);
}
@Override
public Dispatcher createRecoveryDispatcher() {
return super.createRecoveryDispatcher(false);
}
}
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();

View File

@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@ -64,8 +65,6 @@ public class TestContainerLauncher {
appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
AppContext context = mock(AppContext.class);
CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
@ -83,6 +82,8 @@ public class TestContainerLauncher {
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 0; i < 10; i++) {
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@ -92,9 +93,21 @@ public class TestContainerLauncher {
Assert.assertNull(containerLauncher.foundErrors);
// Same set of hosts, so no change
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = true;
int timeOut = 0;
while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
LOG.info("Waiting for number of events processed to become " + 10
+ ". It is now " + containerLauncher.numEventsProcessed.get()
+ ". Timeout is " + timeOut);
Thread.sleep(1000);
}
Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) {
ContainerId containerId =
BuilderUtils.newContainerId(appAttemptId, i + 10);
TaskAttemptId taskAttemptId =
MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@ -106,14 +119,16 @@ public class TestContainerLauncher {
// Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11.
containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 1; i <= 2; i++) {
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host1" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
}
waitForEvents(containerLauncher, 22);
Assert.assertEquals(12, threadPool.getPoolSize());
containerLauncher.expectedCorePoolSize =
11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = false;
ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host11:1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
waitForEvents(containerLauncher, 21);
Assert.assertEquals(11, threadPool.getPoolSize());
Assert.assertNull(containerLauncher.foundErrors);
containerLauncher.stop();
@ -172,15 +187,15 @@ public class TestContainerLauncher {
private void waitForEvents(CustomContainerLauncher containerLauncher,
int expectedNumEvents) throws InterruptedException {
int timeOut = 20;
while (expectedNumEvents != containerLauncher.numEventsProcessed
|| timeOut++ < 20) {
int timeOut = 0;
while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
&& timeOut++ < 20) {
LOG.info("Waiting for number of events to become " + expectedNumEvents
+ ". It is now " + containerLauncher.numEventsProcessed);
+ ". It is now " + containerLauncher.numEventsProcessing.get());
Thread.sleep(1000);
}
Assert
.assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed);
Assert.assertEquals(expectedNumEvents,
containerLauncher.numEventsProcessing.get());
}
@Test
@ -244,9 +259,11 @@ public class TestContainerLauncher {
private final class CustomContainerLauncher extends ContainerLauncherImpl {
private volatile int expectedCorePoolSize = 0;
private volatile int numEventsProcessed = 0;
private AtomicInteger numEventsProcessing = new AtomicInteger(0);
private AtomicInteger numEventsProcessed = new AtomicInteger(0);
private volatile String foundErrors = null;
private volatile boolean finishEventHandling;
private CustomContainerLauncher(AppContext context) {
super(context);
}
@ -255,8 +272,38 @@ public class TestContainerLauncher {
return super.launcherPool;
}
private final class CustomEventProcessor extends
ContainerLauncherImpl.EventProcessor {
private final ContainerLauncherEvent event;
private CustomEventProcessor(ContainerLauncherEvent event) {
super(event);
this.event = event;
}
@Override
public void run() {
// do nothing substantial
LOG.info("Processing the event " + event.toString());
numEventsProcessing.incrementAndGet();
// Stall
while (!finishEventHandling) {
synchronized (this) {
try {
wait(1000);
} catch (InterruptedException e) {
;
}
}
}
numEventsProcessed.incrementAndGet();
}
}
protected ContainerLauncherImpl.EventProcessor createEventProcessor(
ContainerLauncherEvent event) {
final ContainerLauncherEvent event) {
// At this point of time, the EventProcessor is being created and so no
// additional threads would have been created.
@ -266,23 +313,7 @@ public class TestContainerLauncher {
+ launcherPool.getCorePoolSize();
}
return new ContainerLauncherImpl.EventProcessor(event) {
@Override
public void run() {
// do nothing substantial
numEventsProcessed++;
// Stall
synchronized(this) {
try {
while(!finishEventHandling) {
wait(1000);
}
} catch (InterruptedException e) {
;
}
}
}
};
return new CustomEventProcessor(event);
}
}

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree extends ProcessTree {
// to a test directory.
private String procfsDir;
private Integer pid = -1;
static private String deadPid = "-1";
private String pid = deadPid;
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private Long cpuTime = 0L;
private boolean setsidUsed = false;
private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
private Map<String, ProcessInfo> processTree = new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
@ -166,19 +169,19 @@ public class ProcfsBasedProcessTree extends ProcessTree {
* @return the process-tree with latest state.
*/
public ProcfsBasedProcessTree getProcessTree() {
if (pid != -1) {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<Integer> processList = getProcessList();
List<String> processList = getProcessList();
Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
// cache the processTree to get the age for processes
Map<Integer, ProcessInfo> oldProcs =
new HashMap<Integer, ProcessInfo>(processTree);
Map<String, ProcessInfo> oldProcs =
new HashMap<String, ProcessInfo>(processTree);
processTree.clear();
ProcessInfo me = null;
for (Integer proc : processList) {
for (String proc : processList) {
// Get information for each process
ProcessInfo pInfo = new ProcessInfo(proc);
if (constructProcessInfo(pInfo, procfsDir) != null) {
@ -195,9 +198,9 @@ public class ProcfsBasedProcessTree extends ProcessTree {
}
// Add each process to its parent.
for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
Integer pID = entry.getKey();
if (pID != 1) {
for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
String pID = entry.getKey();
if (!pID.equals("1")) {
ProcessInfo pInfo = entry.getValue();
ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
if (parentPInfo != null) {
@ -218,7 +221,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
}
// update age values and compute the number of jiffies since last update
for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
ProcessInfo oldInfo = oldProcs.get(procs.getKey());
if (procs.getValue() != null) {
procs.getValue().updateJiffy(oldInfo);
@ -242,10 +245,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
* @return true if the root-process is alive, false otherwise.
*/
public boolean isAlive() {
if (pid == -1) {
if (pid.equals(deadPid)) {
return false;
} else {
return isAlive(pid.toString());
return isAlive(pid);
}
}
@ -256,8 +259,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
* alive, false otherwise.
*/
public boolean isAnyProcessInTreeAlive() {
for (Integer pId : processTree.keySet()) {
if (isAlive(pId.toString())) {
for (String pId : processTree.keySet()) {
if (isAlive(pId)) {
return true;
}
}
@ -269,9 +272,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
* @param procfsDir Procfs root dir
*/
static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
Integer pId = Integer.parseInt(pidStr);
// Get information for this process
ProcessInfo pInfo = new ProcessInfo(pId);
ProcessInfo pInfo = new ProcessInfo(pidStr);
pInfo = constructProcessInfo(pInfo, procfsDir);
if (pInfo == null) {
// process group leader may have finished execution, but we still need to
@ -279,14 +281,15 @@ public class ProcfsBasedProcessTree extends ProcessTree {
return true;
}
String pgrpId = pInfo.getPgrpId().toString();
//make sure that pId and its pgrpId match
if (!pInfo.getPgrpId().equals(pId)) {
LOG.warn("Unexpected: Process with PID " + pId +
" is not a process group leader.");
if (!pgrpId.equals(pidStr)) {
LOG.warn("Unexpected: Process with PID " + pidStr +
" is not a process group leader. pgrpId is: " + pInfo.getPgrpId());
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug(pId + " is a process group leader, as expected.");
LOG.debug(pidStr + " is a process group leader, as expected.");
}
return true;
}
@ -324,7 +327,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
*/
public void destroy(boolean inBackground) {
LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
if (pid == -1) {
if (pid.equals(deadPid)) {
return;
}
if (isAlive(pid.toString())) {
@ -347,7 +350,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
}
private static final String PROCESSTREE_DUMP_FORMAT =
"\t|- %d %d %d %d %s %d %d %d %d %s\n";
"\t|- %s %s %d %d %s %d %d %d %d %s\n";
/**
* Get a dump of the process-tree.
@ -458,34 +461,27 @@ public class ProcfsBasedProcessTree extends ProcessTree {
return cpuTime;
}
private static Integer getValidPID(String pid) {
Integer retPid = -1;
try {
retPid = Integer.parseInt(pid);
if (retPid <= 0) {
retPid = -1;
}
} catch (NumberFormatException nfe) {
retPid = -1;
}
return retPid;
private static String getValidPID(String pid) {
if (pid == null) return deadPid;
Matcher m = numberPattern.matcher(pid);
if (m.matches()) return pid;
return deadPid;
}
/**
* Get the list of all processes in the system.
*/
private List<Integer> getProcessList() {
private List<String> getProcessList() {
String[] processDirs = (new File(procfsDir)).list();
List<Integer> processList = new ArrayList<Integer>();
List<String> processList = new ArrayList<String>();
for (String dir : processDirs) {
Matcher m = numberPattern.matcher(dir);
if (!m.matches()) continue;
try {
int pd = Integer.parseInt(dir);
if ((new File(procfsDir, dir)).isDirectory()) {
processList.add(Integer.valueOf(pd));
processList.add(dir);
}
} catch (NumberFormatException n) {
// skip this directory
} catch (SecurityException s) {
// skip this process
}
@ -511,7 +507,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
BufferedReader in = null;
FileReader fReader = null;
try {
File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
File pidDir = new File(procfsDir, pinfo.getPid());
fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
@ -528,9 +524,9 @@ public class ProcfsBasedProcessTree extends ProcessTree {
boolean mat = m.find();
if (mat) {
// Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
pinfo.updateProcessInfo(m.group(2), m.group(3),
Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
Long.parseLong(m.group(7)), new BigInteger(m.group(8)),
Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
} else {
LOG.warn("Unexpected: procfs stat file is not in the expected format"
@ -562,7 +558,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
*/
public String toString() {
StringBuffer pTree = new StringBuffer("[ ");
for (Integer p : processTree.keySet()) {
for (String p : processTree.keySet()) {
pTree.append(p);
pTree.append(" ");
}
@ -575,15 +571,16 @@ public class ProcfsBasedProcessTree extends ProcessTree {
*
*/
private static class ProcessInfo {
private Integer pid; // process-id
private String pid; // process-id
private String name; // command name
private Integer pgrpId; // process group-id
private Integer ppid; // parent process-id
private String ppid; // parent process-id
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
private Long rssmemPage; // rss memory usage in # of pages
private Long utime = 0L; // # of jiffies in user mode
private Long stime = 0L; // # of jiffies in kernel mode
private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
// how many times has this process been seen alive
private int age;
@ -595,13 +592,13 @@ public class ProcfsBasedProcessTree extends ProcessTree {
private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
public ProcessInfo(int pid) {
this.pid = Integer.valueOf(pid);
public ProcessInfo(String pid) {
this.pid = pid;
// seeing this the first time.
this.age = 1;
}
public Integer getPid() {
public String getPid() {
return pid;
}
@ -613,7 +610,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
return pgrpId;
}
public Integer getPpid() {
public String getPpid() {
return ppid;
}
@ -629,7 +626,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
return utime;
}
public Long getStime() {
public BigInteger getStime() {
return stime;
}
@ -652,8 +649,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
return false;
}
public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
public void updateProcessInfo(String name, String ppid, Integer pgrpId,
Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) {
this.name = name;
this.ppid = ppid;
this.pgrpId = pgrpId;
@ -665,8 +662,19 @@ public class ProcfsBasedProcessTree extends ProcessTree {
}
public void updateJiffy(ProcessInfo oldInfo) {
this.dtime = (oldInfo == null ? this.utime + this.stime
: (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
if (oldInfo == null) {
BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime));
if (sum.compareTo(MAX_LONG) > 0) {
this.dtime = 0L;
LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime
+ ") is greater than " + Long.MAX_VALUE);
} else {
this.dtime = sum.longValue();
}
return;
}
this.dtime = (this.utime - oldInfo.utime +
this.stime.subtract(oldInfo.stime).longValue());
}
public void updateAge(ProcessInfo oldInfo) {
@ -690,7 +698,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
FileReader fReader = null;
try {
fReader =
new FileReader(new File(new File(procfsDir, pid.toString()),
new FileReader(new File(new File(procfsDir, pid),
PROCFS_CMDLINE_FILE));
} catch (FileNotFoundException f) {
// The process vanished in the interim!

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.service.CompositeService;
/******************************************************************
@ -51,6 +52,9 @@ public class JobHistoryServer extends CompositeService {
@Override
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
try {
doSecureLogin(conf);
} catch(IOException ie) {

View File

@ -53,7 +53,6 @@ public class TestJobHistoryEvents {
@Test
public void testHistoryEvents() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
@ -102,7 +101,6 @@ public class TestJobHistoryEvents {
public void testEventsFlushOnStop() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
.getClass().getName(), true);
app.submit(conf);

View File

@ -22,10 +22,8 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -33,64 +31,71 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* A JUnit test to test Map-Reduce job cleanup.
*/
public class TestJobCleanup extends TestCase {
private static String TEST_ROOT_DIR =
new File(System.getProperty("test.build.data", "/tmp") + "/"
+ "test-job-cleanup").toString();
private static final String CUSTOM_CLEANUP_FILE_NAME =
"_custom_cleanup";
private static final String ABORT_KILLED_FILE_NAME =
"_custom_abort_killed";
private static final String ABORT_FAILED_FILE_NAME =
"_custom_abort_failed";
@SuppressWarnings("deprecation")
public class TestJobCleanup {
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp") + "/" + "test-job-cleanup").toString();
private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
private static FileSystem fileSys = null;
private static MiniMRCluster mr = null;
private static Path inDir = null;
private static Path emptyInDir = null;
private static int outDirs = 0;
public static Test suite() {
TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
fileSys = FileSystem.get(conf);
fileSys.delete(new Path(TEST_ROOT_DIR), true);
conf.set("mapred.job.tracker.handler.count", "1");
conf.set("mapred.job.tracker", "127.0.0.1:0");
conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
inDir = new Path(TEST_ROOT_DIR, "test-input");
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
file.writeBytes(input);
file.close();
emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
fileSys.mkdirs(emptyInDir);
}
protected void tearDown() throws Exception {
if (fileSys != null) {
fileSys.delete(new Path(TEST_ROOT_DIR), true);
fileSys.close();
}
if (mr != null) {
mr.shutdown();
}
}
};
return setup;
private static Log LOG = LogFactory.getLog(TestJobCleanup.class);
@BeforeClass
public static void setUp() throws IOException {
JobConf conf = new JobConf();
fileSys = FileSystem.get(conf);
fileSys.delete(new Path(TEST_ROOT_DIR), true);
conf.set("mapred.job.tracker.handler.count", "1");
conf.set("mapred.job.tracker", "127.0.0.1:0");
conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR +
"/intermediate");
conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true");
mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
inDir = new Path(TEST_ROOT_DIR, "test-input");
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
file.writeBytes(input);
file.close();
emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
fileSys.mkdirs(emptyInDir);
}
/**
* Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
* making a _failed/_killed in the output folder
@AfterClass
public static void tearDown() throws Exception {
if (fileSys != null) {
// fileSys.delete(new Path(TEST_ROOT_DIR), true);
fileSys.close();
}
if (mr != null) {
mr.shutdown();
}
}
/**
* Committer with deprecated
* {@link FileOutputCommitter#cleanupJob(JobContext)} making a _failed/_killed
* in the output folder
*/
static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
@Override
@ -101,31 +106,40 @@ public class TestJobCleanup extends TestCase {
FileSystem fs = outputPath.getFileSystem(conf);
fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
}
@Override
public void commitJob(JobContext context) throws IOException {
cleanupJob(context);
}
@Override
public void abortJob(JobContext context, int i) throws IOException {
cleanupJob(context);
}
}
/**
/**
* Committer with abort making a _failed/_killed in the output folder
*/
static class CommitterWithCustomAbort extends FileOutputCommitter {
@Override
public void abortJob(JobContext context, int state)
throws IOException {
JobConf conf = context.getJobConf();;
public void abortJob(JobContext context, int state) throws IOException {
JobConf conf = context.getJobConf();
;
Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem fs = outputPath.getFileSystem(conf);
String fileName = (state == JobStatus.FAILED)
? TestJobCleanup.ABORT_FAILED_FILE_NAME
: TestJobCleanup.ABORT_KILLED_FILE_NAME;
String fileName = (state == JobStatus.FAILED) ? TestJobCleanup.ABORT_FAILED_FILE_NAME
: TestJobCleanup.ABORT_KILLED_FILE_NAME;
fs.create(new Path(outputPath, fileName)).close();
}
}
private Path getNewOutputDir() {
return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
}
private void configureJob(JobConf jc, String jobName, int maps, int reds,
Path outDir) {
private void configureJob(JobConf jc, String jobName, int maps, int reds,
Path outDir) {
jc.setJobName(jobName);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
@ -137,36 +151,38 @@ public class TestJobCleanup extends TestCase {
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reds);
}
// run a job with 1 map and let it run to completion
private void testSuccessfulJob(String filename,
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
private void testSuccessfulJob(String filename,
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "job with cleanup()", 1, 0, outDir);
jc.setOutputCommitter(committer);
JobClient jobClient = new JobClient(jc);
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
LOG.info("Job finished : " + job.isComplete());
Path testFile = new Path(outDir, filename);
assertTrue("Done file missing for job " + id, fileSys.exists(testFile));
assertTrue("Done file \"" + testFile + "\" missing for job " + id,
fileSys.exists(testFile));
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for successful job "
+ id, fileSys.exists(file));
assertFalse("File " + file + " should not be present for successful job "
+ id, fileSys.exists(file));
}
}
// run a job for which all the attempts simply fail.
private void testFailedJob(String fileName,
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
private void testFailedJob(String fileName,
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "fail job with abort()", 1, 0, outDir);
@ -179,128 +195,129 @@ public class TestJobCleanup extends TestCase {
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for failed job " + id,
fileSys.exists(testFile));
assertTrue("File " + testFile + " missing for failed job " + id,
fileSys.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for failed job "
+ id, fileSys.exists(file));
+ id, fileSys.exists(file));
}
}
// run a job which gets stuck in mapper and kill it.
private void testKilledJob(String fileName,
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
Class<? extends OutputCommitter> committer, String[] exclude)
throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "kill job with abort()", 1, 0, outDir);
// set the job to wait for long
jc.setMapperClass(UtilsForTests.KillMapper.class);
jc.setOutputCommitter(committer);
JobClient jobClient = new JobClient(jc);
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
JobInProgress jip =
mr.getJobTrackerRunner().getJobTracker().getJob(job.getID());
Counters counters = job.getCounters();
// wait for the map to be launched
while (true) {
if (jip.runningMaps() == 1) {
if (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) == 1) {
break;
}
LOG.info("Waiting for a map task to be launched");
UtilsForTests.waitFor(100);
counters = job.getCounters();
}
job.killJob(); // kill the job
job.waitForCompletion(); // wait for the job to complete
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
assertTrue("File " + testFile + " missing for job " + id,
fileSys.exists(testFile));
assertTrue("File " + testFile + " missing for job " + id,
fileSys.exists(testFile));
}
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
+ id, fileSys.exists(file));
+ id, fileSys.exists(file));
}
}
/**
* Test default cleanup/abort behavior
*
*
* @throws IOException
*/
@Test
public void testDefaultCleanupAndAbort() throws IOException {
// check with a successful job
testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
FileOutputCommitter.class,
new String[] {});
FileOutputCommitter.class, new String[] {});
// check with a failed job
testFailedJob(null,
FileOutputCommitter.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
testFailedJob(null, FileOutputCommitter.class,
new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
// check default abort job kill
testKilledJob(null,
FileOutputCommitter.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
testKilledJob(null, FileOutputCommitter.class,
new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
}
/**
* Test if a failed job with custom committer runs the abort code.
*
*
* @throws IOException
*/
@Test
public void testCustomAbort() throws IOException {
// check with a successful job
testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
CommitterWithCustomAbort.class,
new String[] {ABORT_FAILED_FILE_NAME,
ABORT_KILLED_FILE_NAME});
testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
CommitterWithCustomAbort.class, new String[] { ABORT_FAILED_FILE_NAME,
ABORT_KILLED_FILE_NAME });
// check with a failed job
testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_KILLED_FILE_NAME});
testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class,
new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_KILLED_FILE_NAME });
// check with a killed job
testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_FAILED_FILE_NAME});
testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class,
new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME,
ABORT_FAILED_FILE_NAME });
}
/**
* Test if a failed job with custom committer runs the deprecated
* {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
* {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
* compatibility testing.
*/
@Test
public void testCustomCleanup() throws IOException {
// check with a successful job
testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {});
// check with a failed job
testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
// check with a killed job
testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {});
// check with a failed job
testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
// check with a killed job
testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
CommitterWithCustomDeprecatedCleanup.class,
new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
}
}

View File

@ -193,6 +193,12 @@
<Method name="dispatch" />
<Bug pattern="DM_EXIT" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher$EventProcessor" />
<Method name="run" />
<Bug pattern="DM_EXIT" />
</Match>
<!-- Ignore heartbeat exception when killing localizer -->
<Match>

View File

@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private boolean exitOnDispatchException;
public AsyncDispatcher() {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>(), true);
}
public AsyncDispatcher(boolean exitOnException) {
this(new HashMap<Class<? extends Enum>, EventHandler>(),
new LinkedBlockingQueue<Event>(), exitOnException);
this(new LinkedBlockingQueue<Event>());
}
AsyncDispatcher(
Map<Class<? extends Enum>, EventHandler> eventDispatchers,
BlockingQueue<Event> eventQueue, boolean exitOnException) {
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = eventDispatchers;
this.exitOnDispatchException = exitOnException;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}
Runnable createThread() {
@ -86,6 +78,14 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
};
}
@Override
public synchronized void init(Configuration conf) {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.init(conf);
}
@Override
public void start() {
//start all the components
@ -103,7 +103,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.debug("Interrupted Exception while stopping", ie);
LOG.warn("Interrupted Exception while stopping", ie);
}
}
@ -126,8 +126,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread. Exiting..", t);
LOG.fatal("Error in dispatcher thread", t);
if (exitOnDispatchException) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
@ -177,6 +178,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
LOG.warn("AsyncDispatcher thread interrupted", e);
throw new YarnException(e);
}
};

View File

@ -23,8 +23,18 @@ package org.apache.hadoop.yarn.event;
* event handlers based on event types.
*
*/
@SuppressWarnings("rawtypes")
public interface Dispatcher {
// Configuration to make sure dispatcher crashes but doesn't do system-exit in
// case of errors. By default, it should be false, so that tests are not
// affected. For all daemons it should be explicitly set to true so that
// daemons can crash instead of hanging around.
public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
"yarn.dispatcher.exit-on-error";
public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
EventHandler getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);

View File

@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree {
// to a test directory.
private String procfsDir;
protected final Integer pid;
static private String deadPid = "-1";
private String pid = deadPid;
static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
private Long cpuTime = 0L;
private boolean setsidUsed = false;
protected Map<Integer, ProcessInfo> processTree =
new HashMap<Integer, ProcessInfo>();
protected Map<String, ProcessInfo> processTree =
new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) {
this(pid, false);
@ -150,19 +153,19 @@ public class ProcfsBasedProcessTree {
* @return the process-tree with latest state.
*/
public ProcfsBasedProcessTree getProcessTree() {
if (pid != -1) {
if (!pid.equals(deadPid)) {
// Get the list of processes
List<Integer> processList = getProcessList();
List<String> processList = getProcessList();
Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
// cache the processTree to get the age for processes
Map<Integer, ProcessInfo> oldProcs =
new HashMap<Integer, ProcessInfo>(processTree);
Map<String, ProcessInfo> oldProcs =
new HashMap<String, ProcessInfo>(processTree);
processTree.clear();
ProcessInfo me = null;
for (Integer proc : processList) {
for (String proc : processList) {
// Get information for each process
ProcessInfo pInfo = new ProcessInfo(proc);
if (constructProcessInfo(pInfo, procfsDir) != null) {
@ -179,9 +182,9 @@ public class ProcfsBasedProcessTree {
}
// Add each process to its parent.
for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
Integer pID = entry.getKey();
if (pID != 1) {
for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
String pID = entry.getKey();
if (!pID.equals("1")) {
ProcessInfo pInfo = entry.getValue();
ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
if (parentPInfo != null) {
@ -202,7 +205,7 @@ public class ProcfsBasedProcessTree {
}
// update age values and compute the number of jiffies since last update
for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
ProcessInfo oldInfo = oldProcs.get(procs.getKey());
if (procs.getValue() != null) {
procs.getValue().updateJiffy(oldInfo);
@ -227,20 +230,22 @@ public class ProcfsBasedProcessTree {
return checkPidPgrpidForMatch(pid, PROCFS);
}
public static boolean checkPidPgrpidForMatch(int _pid, String procfs) {
public static boolean checkPidPgrpidForMatch(String _pid, String procfs) {
// Get information for this process
ProcessInfo pInfo = new ProcessInfo(_pid);
pInfo = constructProcessInfo(pInfo, procfs);
// null if process group leader finished execution; issue no warning
// make sure that pid and its pgrpId match
return pInfo == null || pInfo.getPgrpId().equals(_pid);
if (pInfo == null) return true;
String pgrpId = pInfo.getPgrpId().toString();
return pgrpId.equals(_pid);
}
private static final String PROCESSTREE_DUMP_FORMAT =
"\t|- %d %d %d %d %s %d %d %d %d %s\n";
"\t|- %s %s %d %d %s %d %d %d %d %s\n";
public List<Integer> getCurrentProcessIDs() {
List<Integer> currentPIDs = new ArrayList<Integer>();
public List<String> getCurrentProcessIDs() {
List<String> currentPIDs = new ArrayList<String>();
currentPIDs.addAll(processTree.keySet());
return currentPIDs;
}
@ -354,34 +359,27 @@ public class ProcfsBasedProcessTree {
return cpuTime;
}
private static Integer getValidPID(String pid) {
Integer retPid = -1;
try {
retPid = Integer.parseInt(pid);
if (retPid <= 0) {
retPid = -1;
}
} catch (NumberFormatException nfe) {
retPid = -1;
}
return retPid;
private static String getValidPID(String pid) {
if (pid == null) return deadPid;
Matcher m = numberPattern.matcher(pid);
if (m.matches()) return pid;
return deadPid;
}
/**
* Get the list of all processes in the system.
*/
private List<Integer> getProcessList() {
private List<String> getProcessList() {
String[] processDirs = (new File(procfsDir)).list();
List<Integer> processList = new ArrayList<Integer>();
List<String> processList = new ArrayList<String>();
for (String dir : processDirs) {
Matcher m = numberPattern.matcher(dir);
if (!m.matches()) continue;
try {
int pd = Integer.parseInt(dir);
if ((new File(procfsDir, dir)).isDirectory()) {
processList.add(Integer.valueOf(pd));
processList.add(dir);
}
} catch (NumberFormatException n) {
// skip this directory
} catch (SecurityException s) {
// skip this process
}
@ -407,7 +405,7 @@ public class ProcfsBasedProcessTree {
BufferedReader in = null;
FileReader fReader = null;
try {
File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
File pidDir = new File(procfsDir, pinfo.getPid());
fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
@ -424,9 +422,9 @@ public class ProcfsBasedProcessTree {
boolean mat = m.find();
if (mat) {
// Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
pinfo.updateProcessInfo(m.group(2), m.group(3),
Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
Long.parseLong(m.group(7)), new BigInteger(m.group(8)),
Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
} else {
LOG.warn("Unexpected: procfs stat file is not in the expected format"
@ -458,7 +456,7 @@ public class ProcfsBasedProcessTree {
*/
public String toString() {
StringBuffer pTree = new StringBuffer("[ ");
for (Integer p : processTree.keySet()) {
for (String p : processTree.keySet()) {
pTree.append(p);
pTree.append(" ");
}
@ -471,15 +469,16 @@ public class ProcfsBasedProcessTree {
*
*/
private static class ProcessInfo {
private Integer pid; // process-id
private String pid; // process-id
private String name; // command name
private Integer pgrpId; // process group-id
private Integer ppid; // parent process-id
private String ppid; // parent process-id
private Integer sessionId; // session-id
private Long vmem; // virtual memory usage
private Long rssmemPage; // rss memory usage in # of pages
private Long utime = 0L; // # of jiffies in user mode
private Long stime = 0L; // # of jiffies in kernel mode
private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
// how many times has this process been seen alive
private int age;
@ -491,13 +490,13 @@ public class ProcfsBasedProcessTree {
private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
public ProcessInfo(int pid) {
this.pid = Integer.valueOf(pid);
public ProcessInfo(String pid) {
this.pid = pid;
// seeing this the first time.
this.age = 1;
}
public Integer getPid() {
public String getPid() {
return pid;
}
@ -509,7 +508,7 @@ public class ProcfsBasedProcessTree {
return pgrpId;
}
public Integer getPpid() {
public String getPpid() {
return ppid;
}
@ -525,7 +524,7 @@ public class ProcfsBasedProcessTree {
return utime;
}
public Long getStime() {
public BigInteger getStime() {
return stime;
}
@ -548,8 +547,8 @@ public class ProcfsBasedProcessTree {
return false;
}
public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
public void updateProcessInfo(String name, String ppid, Integer pgrpId,
Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) {
this.name = name;
this.ppid = ppid;
this.pgrpId = pgrpId;
@ -559,10 +558,21 @@ public class ProcfsBasedProcessTree {
this.vmem = vmem;
this.rssmemPage = rssmem;
}
public void updateJiffy(ProcessInfo oldInfo) {
this.dtime = (oldInfo == null ? this.utime + this.stime
: (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
if (oldInfo == null) {
BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime));
if (sum.compareTo(MAX_LONG) > 0) {
this.dtime = 0L;
LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime
+ ") is greater than " + Long.MAX_VALUE);
} else {
this.dtime = sum.longValue();
}
return;
}
this.dtime = (this.utime - oldInfo.utime +
this.stime.subtract(oldInfo.stime).longValue());
}
public void updateAge(ProcessInfo oldInfo) {

View File

@ -17,10 +17,8 @@
*/
package org.apache.hadoop.yarn.event;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
@ -36,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher {
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
super(eventQueue);
this.queue = eventQueue;
}

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
/**
* This is a service that can be configured to break on any of the lifecycle
* events, so test the failure handling of other parts of the service
* infrastructure.
*
* It retains a counter to the number of times each entry point is called -
* these counters are incremented before the exceptions are raised and
* before the superclass state methods are invoked.
*
*/
public class BreakableService extends AbstractService {
private boolean failOnInit;
private boolean failOnStart;
private boolean failOnStop;
private int[] counts = new int[4];
public BreakableService() {
this(false, false, false);
}
public BreakableService(boolean failOnInit,
boolean failOnStart,
boolean failOnStop) {
super("BreakableService");
this.failOnInit = failOnInit;
this.failOnStart = failOnStart;
this.failOnStop = failOnStop;
inc(STATE.NOTINITED);
}
private int convert(STATE state) {
switch (state) {
case NOTINITED: return 0;
case INITED: return 1;
case STARTED: return 2;
case STOPPED: return 3;
default: return 0;
}
}
private void inc(STATE state) {
int index = convert(state);
counts[index] ++;
}
public int getCount(STATE state) {
return counts[convert(state)];
}
private void maybeFail(boolean fail, String action) {
if (fail) {
throw new BrokenLifecycleEvent(action);
}
}
@Override
public void init(Configuration conf) {
inc(STATE.INITED);
maybeFail(failOnInit, "init");
super.init(conf);
}
@Override
public void start() {
inc(STATE.STARTED);
maybeFail(failOnStart, "start");
super.start();
}
@Override
public void stop() {
inc(STATE.STOPPED);
maybeFail(failOnStop, "stop");
super.stop();
}
public void setFailOnInit(boolean failOnInit) {
this.failOnInit = failOnInit;
}
public void setFailOnStart(boolean failOnStart) {
this.failOnStart = failOnStart;
}
public void setFailOnStop(boolean failOnStop) {
this.failOnStop = failOnStop;
}
/**
* The exception explicitly raised on a failure
*/
public static class BrokenLifecycleEvent extends RuntimeException {
BrokenLifecycleEvent(String action) {
super("Lifecycle Failure during " + action);
}
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.junit.Assert;
/**
* A set of assertions about the state of any service
*/
public class ServiceAssert extends Assert {
public static void assertServiceStateCreated(Service service) {
assertServiceInState(service, Service.STATE.NOTINITED);
}
public static void assertServiceStateInited(Service service) {
assertServiceInState(service, Service.STATE.INITED);
}
public static void assertServiceStateStarted(Service service) {
assertServiceInState(service, Service.STATE.STARTED);
}
public static void assertServiceStateStopped(Service service) {
assertServiceInState(service, Service.STATE.STOPPED);
}
public static void assertServiceInState(Service service, Service.STATE state) {
assertNotNull("Null service", service);
assertEquals("Service in wrong state: " + service, state,
service.getServiceState());
}
}

View File

@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
public class TestServiceLifecycle extends ServiceAssert {
void assertStateCount(BreakableService service,
Service.STATE state,
int expected) {
int actual = service.getCount(state);
if (expected != actual) {
fail("Expected entry count for state [" + state +"] of " + service
+ " to be " + expected + " but was " + actual);
}
}
@Test
public void testWalkthrough() throws Throwable {
BreakableService svc = new BreakableService();
assertServiceStateCreated(svc);
assertStateCount(svc, Service.STATE.NOTINITED, 1);
assertStateCount(svc, Service.STATE.INITED, 0);
assertStateCount(svc, Service.STATE.STARTED, 0);
assertStateCount(svc, Service.STATE.STOPPED, 0);
svc.init(new Configuration());
assertServiceStateInited(svc);
assertStateCount(svc, Service.STATE.INITED, 1);
svc.start();
assertServiceStateStarted(svc);
assertStateCount(svc, Service.STATE.STARTED, 1);
svc.stop();
assertServiceStateStopped(svc);
assertStateCount(svc, Service.STATE.STOPPED, 1);
}
/**
* call init twice
* @throws Throwable
*/
@Test
public void testInitTwice() throws Throwable {
BreakableService svc = new BreakableService();
svc.init(new Configuration());
try {
svc.init(new Configuration());
fail("Expected a failure, got " + svc);
} catch (IllegalStateException e) {
//expected
}
assertStateCount(svc, Service.STATE.INITED, 2);
}
/**
* call start twice
* @throws Throwable
*/
@Test
public void testStartTwice() throws Throwable {
BreakableService svc = new BreakableService();
svc.init(new Configuration());
svc.start();
try {
svc.start();
fail("Expected a failure, got " + svc);
} catch (IllegalStateException e) {
//expected
}
assertStateCount(svc, Service.STATE.STARTED, 2);
}
/**
* verify that when a service is stopped more than once, no exception
* is thrown, and the counter is incremented
* this is because the state change operations happen after the counter in
* the subclass is incremented, even though stop is meant to be a no-op
* @throws Throwable
*/
@Test
public void testStopTwice() throws Throwable {
BreakableService svc = new BreakableService();
svc.init(new Configuration());
svc.start();
svc.stop();
assertStateCount(svc, Service.STATE.STOPPED, 1);
svc.stop();
assertStateCount(svc, Service.STATE.STOPPED, 2);
}
/**
* Show that if the service failed during an init
* operation, it stays in the created state, even after stopping it
* @throws Throwable
*/
@Test
public void testStopFailedInit() throws Throwable {
BreakableService svc = new BreakableService(true, false, false);
assertServiceStateCreated(svc);
try {
svc.init(new Configuration());
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
//expected
}
//the service state wasn't passed
assertServiceStateCreated(svc);
assertStateCount(svc, Service.STATE.INITED, 1);
//now try to stop
svc.stop();
//even after the stop operation, we haven't entered the state
assertServiceStateCreated(svc);
}
/**
* Show that if the service failed during an init
* operation, it stays in the created state, even after stopping it
* @throws Throwable
*/
@Test
public void testStopFailedStart() throws Throwable {
BreakableService svc = new BreakableService(false, true, false);
svc.init(new Configuration());
assertServiceStateInited(svc);
try {
svc.start();
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
//expected
}
//the service state wasn't passed
assertServiceStateInited(svc);
assertStateCount(svc, Service.STATE.INITED, 1);
//now try to stop
svc.stop();
//even after the stop operation, we haven't entered the state
assertServiceStateInited(svc);
}
/**
* verify that when a service is stopped more than once, no exception
* is thrown, and the counter is incremented
* this is because the state change operations happen after the counter in
* the subclass is incremented, even though stop is meant to be a no-op
* @throws Throwable
*/
@Test
public void testFailingStop() throws Throwable {
BreakableService svc = new BreakableService(false, false, true);
svc.init(new Configuration());
svc.start();
try {
svc.stop();
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
//expected
}
assertStateCount(svc, Service.STATE.STOPPED, 1);
//now try again, and expect it to happen again
try {
svc.stop();
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
//expected
}
assertStateCount(svc, Service.STATE.STOPPED, 2);
}
}

View File

@ -527,7 +527,7 @@ public class TestProcfsBasedProcessTree {
// Let us not create stat file for pid 100.
Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
pid, procfsRootDir.getAbsolutePath()));
} finally {
FileUtil.fullyDelete(procfsRootDir);
}
@ -662,8 +662,8 @@ public class TestProcfsBasedProcessTree {
*/
private static boolean isAnyProcessInTreeAlive(
ProcfsBasedProcessTree processTree) {
for (Integer pId : processTree.getCurrentProcessIDs()) {
if (isAlive(pId.toString())) {
for (String pId : processTree.getCurrentProcessIDs()) {
if (isAlive(pId)) {
return true;
}
}

View File

@ -74,7 +74,7 @@
<configureEnvironment>
<property>
<name>CFLAGS</name>
<value>-DHADOOP_CONF_DIR=${container-executor.conf.dir}</value>
<value>-DHADOOP_CONF_DIR=${container-executor.conf.dir} -m32</value>
</property>
</configureEnvironment>
<configureWorkDir>${project.build.directory}/native/container-executor</configureWorkDir>

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
@ -85,6 +86,7 @@ public class DeletionService extends AbstractService {
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
super.init(conf);
}
@ -92,14 +94,27 @@ public class DeletionService extends AbstractService {
@Override
public void stop() {
sched.shutdown();
boolean terminated = false;
try {
sched.awaitTermination(10, SECONDS);
terminated = sched.awaitTermination(10, SECONDS);
} catch (InterruptedException e) {
}
if (terminated != true) {
sched.shutdownNow();
}
super.stop();
}
/**
* Determine if the service has completely stopped.
* Used only by unit tests
* @return true if service has completely stopped
*/
@Private
public boolean isTerminated() {
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
private class FileDeletion implements Runnable {
final String user;
final Path subDir;

View File

@ -99,6 +99,8 @@ public class NodeManager extends CompositeService implements
@Override
public void init(Configuration conf) {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
Context context = new NMContext();
// Create the secretManager if need be.

View File

@ -137,6 +137,7 @@ public class ContainerManagerImpl extends CompositeService implements
this.context = context;
this.dirsHandler = dirsHandler;
// ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;

View File

@ -27,12 +27,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestDeletionService {
@ -107,12 +110,18 @@ public class TestDeletionService {
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
p, null);
}
int msecToWait = 20 * 1000;
for (Path p : dirs) {
while (msecToWait > 0 && lfs.util().exists(p)) {
Thread.sleep(100);
msecToWait -= 100;
}
assertFalse(lfs.util().exists(p));
}
} finally {
del.stop();
}
for (Path p : dirs) {
assertFalse(lfs.util().exists(p));
}
}
@Test
@ -137,14 +146,35 @@ public class TestDeletionService {
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
p, baseDirs.toArray(new Path[4]));
}
int msecToWait = 20 * 1000;
for (Path p : baseDirs) {
for (Path q : content) {
Path fp = new Path(p, q);
while (msecToWait > 0 && lfs.util().exists(fp)) {
Thread.sleep(100);
msecToWait -= 100;
}
assertFalse(lfs.util().exists(fp));
}
}
} finally {
del.stop();
}
for (Path p : baseDirs) {
for (Path q : content) {
assertFalse(lfs.util().exists(new Path(p, q)));
}
}
}
@Test
public void testStopWithDelayedTasks() throws Exception {
DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class));
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60);
del.init(conf);
del.start();
try {
del.delete("dingo", new Path("/does/not/exist"));
} finally {
del.stop();
}
assertTrue(del.isTerminated());
}
}

View File

@ -376,7 +376,7 @@ public class TestApplication {
WrappedApplication(int id, long timestamp, String user, int numContainers) {
dispatcher = new DrainDispatcher();
dispatcher.init(null);
dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);

View File

@ -517,7 +517,7 @@ public class TestContainer {
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) {
dispatcher = new DrainDispatcher();
dispatcher.init(null);
dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);

View File

@ -17,6 +17,15 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestLocalizedResource {
@ -62,7 +66,7 @@ public class TestLocalizedResource {
@SuppressWarnings("unchecked") // mocked generic
public void testNotification() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null);
dispatcher.init(new Configuration());
try {
dispatcher.start();
EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
@ -175,7 +179,7 @@ public class TestLocalizedResource {
@Test
public void testDirectLocalization() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
dispatcher.init(null);
dispatcher.init(new Configuration());
try {
dispatcher.start();
LocalResource apiRsrc = createMockResource();

View File

@ -18,8 +18,23 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.net.InetSocketAddress;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -33,7 +48,6 @@ import java.util.Set;
import junit.framework.Assert;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -42,6 +56,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import static org.junit.Assert.*;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
public class TestResourceLocalizationService {
@ -98,11 +108,11 @@ public class TestResourceLocalizationService {
public void testLocalizationInit() throws Exception {
final Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(null);
dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec));
delService.init(null);
delService.init(new Configuration());
delService.start();
AbstractFileSystem spylfs =
@ -371,7 +381,7 @@ public class TestResourceLocalizationService {
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
delService.init(null);
delService.init(new Configuration());
delService.start();
ResourceLocalizationService rawService =

View File

@ -131,6 +131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.conf = conf;
this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
this.rmDispatcher = createDispatcher();
addIfService(this.rmDispatcher);
@ -265,6 +267,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private final BlockingQueue<SchedulerEvent> eventQueue =
new LinkedBlockingQueue<SchedulerEvent>();
private final Thread eventProcessor;
private volatile boolean stopped = false;
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
super(SchedulerEventDispatcher.class.getName());
@ -285,7 +288,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
SchedulerEvent event;
while (!Thread.currentThread().isInterrupted()) {
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
@ -296,9 +299,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
try {
scheduler.handle(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
return; // TODO: Kill RM.
if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
}
}
@ -306,6 +313,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public synchronized void stop() {
this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();