diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index cbc39e5d740..a4ee8738022 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -86,6 +86,9 @@ Trunk (unreleased changes)
HADOOP-8070. Add a standalone benchmark for RPC call performance. (todd)
+ HADOOP-8084. Updates ProtoBufRpc engine to not do an unnecessary copy
+ for RPC request/response. (ddas)
+
BUG FIXES
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 9b3862abdba..9a1a1615b0c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
@@ -45,6 +46,7 @@ import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.Res
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -268,13 +270,13 @@ public class ProtobufRpcEngine implements RpcEngine {
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(message.toByteArray().length);
- out.write(message.toByteArray());
+ ((Message)message).writeDelimitedTo(
+ DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
+ int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = HadoopRpcRequestProto.parseFrom(bytes);
@@ -297,13 +299,13 @@ public class ProtobufRpcEngine implements RpcEngine {
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(message.toByteArray().length);
- out.write(message.toByteArray());
+ ((Message)message).writeDelimitedTo(
+ DataOutputOutputStream.constructOutputStream(out));
}
@Override
public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
+ int length = ProtoUtil.readRawVarint32(in);
byte[] bytes = new byte[length];
in.readFully(bytes);
message = HadoopRpcResponseProto.parseFrom(bytes);
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index c28c138924c..63cd2e37f13 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -108,6 +108,7 @@ Release 0.23.2 - UNRELEASED
OPTIMIZATIONS
BUG FIXES
+
MAPREDUCE-3862. Nodemanager can appear to hang on shutdown due to lingering
DeletionService threads (Jason Lowe via bobby)
@@ -125,6 +126,12 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to
avoid NumberFormatException caused by overflow. (Zhihong Yu via szetszwo)
+
+ MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when
+ their EventHandlers get exceptions. (vinodkv)
+
+ MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it
+ to the maven build. (Ravi Prakash via vinodkv)
Release 0.23.1 - 2012-02-17
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 6c45574b7dc..0dddd66d59f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -188,6 +188,8 @@ public class MRAppMaster extends CompositeService {
@Override
public void init(final Configuration conf) {
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
downloadTokensAndSetupUGI(conf);
context = new RunningAppContext(conf);
@@ -379,6 +381,7 @@ public class MRAppMaster extends CompositeService {
// this is the only job, so shut down the Appmaster
// note in a workflow scenario, this may lead to creation of a new
// job (FIXME?)
+ // Send job-end notification
if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
LOG.info("Job end notification started for jobID : "
@@ -405,7 +408,6 @@ public class MRAppMaster extends CompositeService {
LOG.info("Calling stop for all the services");
stop();
- // Send job-end notification
} catch (Throwable t) {
LOG.warn("Graceful stop failed ", t);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
index 00bfc391ae4..ce720e1dea5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
@@ -61,7 +61,8 @@ public class ContainerLauncherEvent
@Override
public String toString() {
- return super.toString() + " for taskAttempt " + taskAttemptID;
+ return super.toString() + " for container " + containerID + " taskAttempt "
+ + taskAttemptID;
}
@Override
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index 1443eed6089..0befad86427 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -334,7 +334,6 @@ public class ContainerLauncherImpl extends AbstractService implements
LOG.error("Returning, interrupted : " + e);
return;
}
-
int poolSize = launcherPool.getCorePoolSize();
// See if we need up the pool size only if haven't reached the
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
index 3bf6e075849..8e041e890ce 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
@@ -216,26 +216,18 @@ public class RecoveryService extends CompositeService implements Recovery {
protected Dispatcher createRecoveryDispatcher() {
return new RecoveryDispatcher();
}
-
- protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
- return new RecoveryDispatcher(exitOnException);
- }
@SuppressWarnings("rawtypes")
class RecoveryDispatcher extends AsyncDispatcher {
private final EventHandler actualHandler;
private final EventHandler handler;
- RecoveryDispatcher(boolean exitOnException) {
- super(exitOnException);
+ RecoveryDispatcher() {
+ super();
actualHandler = super.getEventHandler();
handler = new InterceptingEventHandler(actualHandler);
}
- RecoveryDispatcher() {
- this(false);
- }
-
@Override
@SuppressWarnings("unchecked")
public void dispatch(Event event) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index 00bdaebfe80..325adacb300 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 1b35b21559a..60ec171c5f3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -49,6 +51,7 @@ import org.junit.Test;
* Tests the state machine with respect to Job/Task/TaskAttempt failure
* scenarios.
*/
+@SuppressWarnings("unchecked")
public class TestFail {
@Test
@@ -247,10 +250,17 @@ public class TestFail {
//when attempt times out, heartbeat handler will send the lost event
//leading to Attempt failure
return new TaskAttemptListenerImpl(getContext(), null) {
+ @Override
public void startRpcServer(){};
+ @Override
public void stopRpcServer(){};
+ @Override
+ public InetSocketAddress getAddress() {
+ return NetUtils.createSocketAddr("localhost", 1234);
+ }
public void init(Configuration conf) {
- conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
+ conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
+ conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
super.init(conf);
}
};
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 509c49e9cc0..87fce7ece6b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@@ -724,13 +719,6 @@ public class TestRecovery {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
- @Override
- protected Recovery createRecoveryService(AppContext appContext) {
- return new RecoveryServiceWithCustomDispatcher(
- appContext.getApplicationAttemptId(), appContext.getClock(),
- getCommitter());
- }
-
@Override
protected ContainerLauncher createContainerLauncher(AppContext context) {
MockContainerLauncher launcher = new MockContainerLauncher() {
@@ -757,21 +745,6 @@ public class TestRecovery {
}
}
- static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
-
- public RecoveryServiceWithCustomDispatcher(
- ApplicationAttemptId applicationAttemptId, Clock clock,
- OutputCommitter committer) {
- super(applicationAttemptId, clock, committer);
- }
-
- @Override
- public Dispatcher createRecoveryDispatcher() {
- return super.createRecoveryDispatcher(false);
- }
-
- }
-
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
index f25de5c8cc2..b0ecb5c9bf7 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -64,8 +65,6 @@ public class TestContainerLauncher {
appId, 3);
JobId jobId = MRBuilderUtils.newJobId(appId, 8);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
- TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
- ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
AppContext context = mock(AppContext.class);
CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
@@ -83,6 +82,8 @@ public class TestContainerLauncher {
containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
for (int i = 0; i < 10; i++) {
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
+ TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -92,9 +93,21 @@ public class TestContainerLauncher {
Assert.assertNull(containerLauncher.foundErrors);
// Same set of hosts, so no change
- containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
containerLauncher.finishEventHandling = true;
+ int timeOut = 0;
+ while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
+ LOG.info("Waiting for number of events processed to become " + 10
+ + ". It is now " + containerLauncher.numEventsProcessed.get()
+ + ". Timeout is " + timeOut);
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
+ containerLauncher.finishEventHandling = false;
for (int i = 0; i < 10; i++) {
+ ContainerId containerId =
+ BuilderUtils.newContainerId(appAttemptId, i + 10);
+ TaskAttemptId taskAttemptId =
+ MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
containerId, "host" + i + ":1234", null,
ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -106,14 +119,16 @@ public class TestContainerLauncher {
// Different hosts, there should be an increase in core-thread-pool size to
// 21(11hosts+10buffer)
// Core pool size should be 21 but the live pool size should be only 11.
- containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
- for (int i = 1; i <= 2; i++) {
- containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
- containerId, "host1" + i + ":1234", null,
- ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
- }
- waitForEvents(containerLauncher, 22);
- Assert.assertEquals(12, threadPool.getPoolSize());
+ containerLauncher.expectedCorePoolSize =
+ 11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+ containerLauncher.finishEventHandling = false;
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
+ TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
+ containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+ containerId, "host11:1234", null,
+ ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+ waitForEvents(containerLauncher, 21);
+ Assert.assertEquals(11, threadPool.getPoolSize());
Assert.assertNull(containerLauncher.foundErrors);
containerLauncher.stop();
@@ -172,15 +187,15 @@ public class TestContainerLauncher {
private void waitForEvents(CustomContainerLauncher containerLauncher,
int expectedNumEvents) throws InterruptedException {
- int timeOut = 20;
- while (expectedNumEvents != containerLauncher.numEventsProcessed
- || timeOut++ < 20) {
+ int timeOut = 0;
+ while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
+ && timeOut++ < 20) {
LOG.info("Waiting for number of events to become " + expectedNumEvents
- + ". It is now " + containerLauncher.numEventsProcessed);
+ + ". It is now " + containerLauncher.numEventsProcessing.get());
Thread.sleep(1000);
}
- Assert
- .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed);
+ Assert.assertEquals(expectedNumEvents,
+ containerLauncher.numEventsProcessing.get());
}
@Test
@@ -244,9 +259,11 @@ public class TestContainerLauncher {
private final class CustomContainerLauncher extends ContainerLauncherImpl {
private volatile int expectedCorePoolSize = 0;
- private volatile int numEventsProcessed = 0;
+ private AtomicInteger numEventsProcessing = new AtomicInteger(0);
+ private AtomicInteger numEventsProcessed = new AtomicInteger(0);
private volatile String foundErrors = null;
private volatile boolean finishEventHandling;
+
private CustomContainerLauncher(AppContext context) {
super(context);
}
@@ -255,8 +272,38 @@ public class TestContainerLauncher {
return super.launcherPool;
}
+ private final class CustomEventProcessor extends
+ ContainerLauncherImpl.EventProcessor {
+ private final ContainerLauncherEvent event;
+
+ private CustomEventProcessor(ContainerLauncherEvent event) {
+ super(event);
+ this.event = event;
+ }
+
+ @Override
+ public void run() {
+ // do nothing substantial
+
+ LOG.info("Processing the event " + event.toString());
+
+ numEventsProcessing.incrementAndGet();
+ // Stall
+ while (!finishEventHandling) {
+ synchronized (this) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ ;
+ }
+ }
+ }
+ numEventsProcessed.incrementAndGet();
+ }
+ }
+
protected ContainerLauncherImpl.EventProcessor createEventProcessor(
- ContainerLauncherEvent event) {
+ final ContainerLauncherEvent event) {
// At this point of time, the EventProcessor is being created and so no
// additional threads would have been created.
@@ -266,23 +313,7 @@ public class TestContainerLauncher {
+ launcherPool.getCorePoolSize();
}
- return new ContainerLauncherImpl.EventProcessor(event) {
- @Override
- public void run() {
- // do nothing substantial
- numEventsProcessed++;
- // Stall
- synchronized(this) {
- try {
- while(!finishEventHandling) {
- wait(1000);
- }
- } catch (InterruptedException e) {
- ;
- }
- }
- }
- };
+ return new CustomEventProcessor(event);
}
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
index c1626f9531e..b420e3dfbe3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.service.CompositeService;
/******************************************************************
@@ -51,6 +52,9 @@ public class JobHistoryServer extends CompositeService {
@Override
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
+
+ config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
try {
doSecureLogin(conf);
} catch(IOException ie) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
index ec2fe5ea3ae..58ab9c8117e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
@@ -53,7 +53,6 @@ public class TestJobHistoryEvents {
@Test
public void testHistoryEvents() throws Exception {
Configuration conf = new Configuration();
- conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
@@ -102,7 +101,6 @@ public class TestJobHistoryEvents {
public void testEventsFlushOnStop() throws Exception {
Configuration conf = new Configuration();
- conf.set(MRJobConfig.USER_NAME, "test");
MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
.getClass().getName(), true);
app.submit(conf);
diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
similarity index 55%
rename from hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
index 3635892d639..6139fdb05f1 100644
--- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobCleanup.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
@@ -22,10 +22,8 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -33,64 +31,71 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
/**
* A JUnit test to test Map-Reduce job cleanup.
*/
-public class TestJobCleanup extends TestCase {
- private static String TEST_ROOT_DIR =
- new File(System.getProperty("test.build.data", "/tmp") + "/"
- + "test-job-cleanup").toString();
- private static final String CUSTOM_CLEANUP_FILE_NAME =
- "_custom_cleanup";
- private static final String ABORT_KILLED_FILE_NAME =
- "_custom_abort_killed";
- private static final String ABORT_FAILED_FILE_NAME =
- "_custom_abort_failed";
+@SuppressWarnings("deprecation")
+public class TestJobCleanup {
+ private static String TEST_ROOT_DIR = new File(System.getProperty(
+ "test.build.data", "/tmp") + "/" + "test-job-cleanup").toString();
+ private static final String CUSTOM_CLEANUP_FILE_NAME = "_custom_cleanup";
+ private static final String ABORT_KILLED_FILE_NAME = "_custom_abort_killed";
+ private static final String ABORT_FAILED_FILE_NAME = "_custom_abort_failed";
private static FileSystem fileSys = null;
private static MiniMRCluster mr = null;
private static Path inDir = null;
private static Path emptyInDir = null;
private static int outDirs = 0;
-
- public static Test suite() {
- TestSetup setup = new TestSetup(new TestSuite(TestJobCleanup.class)) {
- protected void setUp() throws Exception {
- JobConf conf = new JobConf();
- fileSys = FileSystem.get(conf);
- fileSys.delete(new Path(TEST_ROOT_DIR), true);
- conf.set("mapred.job.tracker.handler.count", "1");
- conf.set("mapred.job.tracker", "127.0.0.1:0");
- conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
- conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
- mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- inDir = new Path(TEST_ROOT_DIR, "test-input");
- String input = "The quick brown fox\n" + "has many silly\n"
- + "red fox sox\n";
- DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
- file.writeBytes(input);
- file.close();
- emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
- fileSys.mkdirs(emptyInDir);
- }
-
- protected void tearDown() throws Exception {
- if (fileSys != null) {
- fileSys.delete(new Path(TEST_ROOT_DIR), true);
- fileSys.close();
- }
- if (mr != null) {
- mr.shutdown();
- }
- }
- };
- return setup;
+ private static Log LOG = LogFactory.getLog(TestJobCleanup.class);
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ JobConf conf = new JobConf();
+ fileSys = FileSystem.get(conf);
+ fileSys.delete(new Path(TEST_ROOT_DIR), true);
+ conf.set("mapred.job.tracker.handler.count", "1");
+ conf.set("mapred.job.tracker", "127.0.0.1:0");
+ conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+ conf.set("mapred.task.tracker.http.address", "127.0.0.1:0");
+ conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, TEST_ROOT_DIR +
+ "/intermediate");
+ conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, "true");
+
+ mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
+ inDir = new Path(TEST_ROOT_DIR, "test-input");
+ String input = "The quick brown fox\n" + "has many silly\n"
+ + "red fox sox\n";
+ DataOutputStream file = fileSys.create(new Path(inDir, "part-" + 0));
+ file.writeBytes(input);
+ file.close();
+ emptyInDir = new Path(TEST_ROOT_DIR, "empty-input");
+ fileSys.mkdirs(emptyInDir);
}
-
- /**
- * Committer with deprecated {@link FileOutputCommitter#cleanupJob(JobContext)}
- * making a _failed/_killed in the output folder
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (fileSys != null) {
+ // fileSys.delete(new Path(TEST_ROOT_DIR), true);
+ fileSys.close();
+ }
+ if (mr != null) {
+ mr.shutdown();
+ }
+ }
+
+ /**
+ * Committer with deprecated
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} making a _failed/_killed
+ * in the output folder
*/
static class CommitterWithCustomDeprecatedCleanup extends FileOutputCommitter {
@Override
@@ -101,31 +106,40 @@ public class TestJobCleanup extends TestCase {
FileSystem fs = outputPath.getFileSystem(conf);
fs.create(new Path(outputPath, CUSTOM_CLEANUP_FILE_NAME)).close();
}
+
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ cleanupJob(context);
+ }
+
+ @Override
+ public void abortJob(JobContext context, int i) throws IOException {
+ cleanupJob(context);
+ }
}
-
- /**
+
+ /**
* Committer with abort making a _failed/_killed in the output folder
*/
static class CommitterWithCustomAbort extends FileOutputCommitter {
@Override
- public void abortJob(JobContext context, int state)
- throws IOException {
- JobConf conf = context.getJobConf();;
+ public void abortJob(JobContext context, int state) throws IOException {
+ JobConf conf = context.getJobConf();
+ ;
Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem fs = outputPath.getFileSystem(conf);
- String fileName = (state == JobStatus.FAILED)
- ? TestJobCleanup.ABORT_FAILED_FILE_NAME
- : TestJobCleanup.ABORT_KILLED_FILE_NAME;
+ String fileName = (state == JobStatus.FAILED) ? TestJobCleanup.ABORT_FAILED_FILE_NAME
+ : TestJobCleanup.ABORT_KILLED_FILE_NAME;
fs.create(new Path(outputPath, fileName)).close();
}
}
-
+
private Path getNewOutputDir() {
return new Path(TEST_ROOT_DIR, "output-" + outDirs++);
}
-
- private void configureJob(JobConf jc, String jobName, int maps, int reds,
- Path outDir) {
+
+ private void configureJob(JobConf jc, String jobName, int maps, int reds,
+ Path outDir) {
jc.setJobName(jobName);
jc.setInputFormat(TextInputFormat.class);
jc.setOutputKeyClass(LongWritable.class);
@@ -137,36 +151,38 @@ public class TestJobCleanup extends TestCase {
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reds);
}
-
+
// run a job with 1 map and let it run to completion
- private void testSuccessfulJob(String filename,
- Class extends OutputCommitter> committer, String[] exclude)
- throws IOException {
+ private void testSuccessfulJob(String filename,
+ Class extends OutputCommitter> committer, String[] exclude)
+ throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "job with cleanup()", 1, 0, outDir);
jc.setOutputCommitter(committer);
-
+
JobClient jobClient = new JobClient(jc);
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
-
+
+ LOG.info("Job finished : " + job.isComplete());
Path testFile = new Path(outDir, filename);
- assertTrue("Done file missing for job " + id, fileSys.exists(testFile));
-
+ assertTrue("Done file \"" + testFile + "\" missing for job " + id,
+ fileSys.exists(testFile));
+
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
- assertFalse("File " + file + " should not be present for successful job "
- + id, fileSys.exists(file));
+ assertFalse("File " + file + " should not be present for successful job "
+ + id, fileSys.exists(file));
}
}
-
+
// run a job for which all the attempts simply fail.
- private void testFailedJob(String fileName,
- Class extends OutputCommitter> committer, String[] exclude)
- throws IOException {
+ private void testFailedJob(String fileName,
+ Class extends OutputCommitter> committer, String[] exclude)
+ throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "fail job with abort()", 1, 0, outDir);
@@ -179,128 +195,129 @@ public class TestJobCleanup extends TestCase {
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
job.waitForCompletion();
-
+
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
- assertTrue("File " + testFile + " missing for failed job " + id,
- fileSys.exists(testFile));
+ assertTrue("File " + testFile + " missing for failed job " + id,
+ fileSys.exists(testFile));
}
-
+
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for failed job "
- + id, fileSys.exists(file));
+ + id, fileSys.exists(file));
}
}
-
+
// run a job which gets stuck in mapper and kill it.
private void testKilledJob(String fileName,
- Class extends OutputCommitter> committer, String[] exclude)
- throws IOException {
+ Class extends OutputCommitter> committer, String[] exclude)
+ throws IOException {
JobConf jc = mr.createJobConf();
Path outDir = getNewOutputDir();
configureJob(jc, "kill job with abort()", 1, 0, outDir);
// set the job to wait for long
jc.setMapperClass(UtilsForTests.KillMapper.class);
jc.setOutputCommitter(committer);
-
+
JobClient jobClient = new JobClient(jc);
RunningJob job = jobClient.submitJob(jc);
JobID id = job.getID();
- JobInProgress jip =
- mr.getJobTrackerRunner().getJobTracker().getJob(job.getID());
-
+
+ Counters counters = job.getCounters();
+
// wait for the map to be launched
while (true) {
- if (jip.runningMaps() == 1) {
+ if (counters.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) == 1) {
break;
}
+ LOG.info("Waiting for a map task to be launched");
UtilsForTests.waitFor(100);
+ counters = job.getCounters();
}
-
+
job.killJob(); // kill the job
-
+
job.waitForCompletion(); // wait for the job to complete
-
+
if (fileName != null) {
Path testFile = new Path(outDir, fileName);
- assertTrue("File " + testFile + " missing for job " + id,
- fileSys.exists(testFile));
+ assertTrue("File " + testFile + " missing for job " + id,
+ fileSys.exists(testFile));
}
-
+
// check if the files from the missing set exists
for (String ex : exclude) {
Path file = new Path(outDir, ex);
assertFalse("File " + file + " should not be present for killed job "
- + id, fileSys.exists(file));
+ + id, fileSys.exists(file));
}
}
-
+
/**
* Test default cleanup/abort behavior
- *
+ *
* @throws IOException
*/
+ @Test
public void testDefaultCleanupAndAbort() throws IOException {
// check with a successful job
testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
- FileOutputCommitter.class,
- new String[] {});
-
+ FileOutputCommitter.class, new String[] {});
+
// check with a failed job
- testFailedJob(null,
- FileOutputCommitter.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
-
+ testFailedJob(null, FileOutputCommitter.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
+
// check default abort job kill
- testKilledJob(null,
- FileOutputCommitter.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ testKilledJob(null, FileOutputCommitter.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME });
}
-
+
/**
* Test if a failed job with custom committer runs the abort code.
- *
+ *
* @throws IOException
*/
+ @Test
public void testCustomAbort() throws IOException {
// check with a successful job
- testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
- CommitterWithCustomAbort.class,
- new String[] {ABORT_FAILED_FILE_NAME,
- ABORT_KILLED_FILE_NAME});
-
+ testSuccessfulJob(FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ CommitterWithCustomAbort.class, new String[] { ABORT_FAILED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME });
+
// check with a failed job
- testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
- ABORT_KILLED_FILE_NAME});
-
+ testFailedJob(ABORT_FAILED_FILE_NAME, CommitterWithCustomAbort.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_KILLED_FILE_NAME });
+
// check with a killed job
- testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME,
- ABORT_FAILED_FILE_NAME});
+ testKilledJob(ABORT_KILLED_FILE_NAME, CommitterWithCustomAbort.class,
+ new String[] { FileOutputCommitter.SUCCEEDED_FILE_NAME,
+ ABORT_FAILED_FILE_NAME });
}
/**
* Test if a failed job with custom committer runs the deprecated
- * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
+ * {@link FileOutputCommitter#cleanupJob(JobContext)} code for api
* compatibility testing.
*/
+ @Test
public void testCustomCleanup() throws IOException {
// check with a successful job
- testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
- CommitterWithCustomDeprecatedCleanup.class,
- new String[] {});
-
- // check with a failed job
- testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
- CommitterWithCustomDeprecatedCleanup.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
-
- // check with a killed job
- testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
- CommitterWithCustomDeprecatedCleanup.class,
- new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+ testSuccessfulJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {});
+
+ // check with a failed job
+ testFailedJob(CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
+
+ // check with a killed job
+ testKilledJob(TestJobCleanup.CUSTOM_CLEANUP_FILE_NAME,
+ CommitterWithCustomDeprecatedCleanup.class,
+ new String[] {FileOutputCommitter.SUCCEEDED_FILE_NAME});
}
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 4b17c1e943e..799e20092d4 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -193,6 +193,12 @@
+
+
+
+
+
+
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
index ffa2e9cfc60..1b3a76a4778 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
@@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private boolean exitOnDispatchException;
public AsyncDispatcher() {
- this(new HashMap, EventHandler>(),
- new LinkedBlockingQueue(), true);
- }
-
- public AsyncDispatcher(boolean exitOnException) {
- this(new HashMap, EventHandler>(),
- new LinkedBlockingQueue(), exitOnException);
+ this(new LinkedBlockingQueue());
}
- AsyncDispatcher(
- Map, EventHandler> eventDispatchers,
- BlockingQueue eventQueue, boolean exitOnException) {
+ public AsyncDispatcher(BlockingQueue eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
- this.eventDispatchers = eventDispatchers;
- this.exitOnDispatchException = exitOnException;
+ this.eventDispatchers = new HashMap, EventHandler>();
}
Runnable createThread() {
@@ -86,6 +78,14 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
};
}
+ @Override
+ public synchronized void init(Configuration conf) {
+ this.exitOnDispatchException =
+ conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+ super.init(conf);
+ }
+
@Override
public void start() {
//start all the components
@@ -103,7 +103,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
- LOG.debug("Interrupted Exception while stopping", ie);
+ LOG.warn("Interrupted Exception while stopping", ie);
}
}
@@ -126,8 +126,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
catch (Throwable t) {
//TODO Maybe log the state of the queue
- LOG.fatal("Error in dispatcher thread. Exiting..", t);
+ LOG.fatal("Error in dispatcher thread", t);
if (exitOnDispatchException) {
+ LOG.info("Exiting, bbye..");
System.exit(-1);
}
}
@@ -177,6 +178,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
+ LOG.warn("AsyncDispatcher thread interrupted", e);
throw new YarnException(e);
}
};
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
index 40c07755a98..f87f1b26c78 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
@@ -23,8 +23,18 @@ package org.apache.hadoop.yarn.event;
* event handlers based on event types.
*
*/
+@SuppressWarnings("rawtypes")
public interface Dispatcher {
+ // Configuration to make sure dispatcher crashes but doesn't do system-exit in
+ // case of errors. By default, it should be false, so that tests are not
+ // affected. For all daemons it should be explicitly set to true so that
+ // daemons can crash instead of hanging around.
+ public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
+ "yarn.dispatcher.exit-on-error";
+
+ public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
+
EventHandler getEventHandler();
void register(Class extends Enum> eventType, EventHandler handler);
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
index 20d7dfca94c..e79e7b360ef 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
@@ -17,10 +17,8 @@
*/
package org.apache.hadoop.yarn.event;
-import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
@@ -36,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher {
}
private DrainDispatcher(BlockingQueue eventQueue) {
- super(new HashMap, EventHandler>(), eventQueue, true);
+ super(eventQueue);
this.queue = eventQueue;
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 439b5e37a57..8f875e117ae 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -99,6 +99,8 @@ public class NodeManager extends CompositeService implements
@Override
public void init(Configuration conf) {
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
Context context = new NMContext();
// Create the secretManager if need be.
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 3169f2f1b8a..1eb22837939 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -137,6 +137,7 @@ public class ContainerManagerImpl extends CompositeService implements
this.context = context;
this.dirsHandler = dirsHandler;
+ // ContainerManager level dispatcher.
dispatcher = new AsyncDispatcher();
this.deletionService = deletionContext;
this.metrics = metrics;
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 236489b7f4c..4b3736024b1 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -376,7 +376,7 @@ public class TestApplication {
WrappedApplication(int id, long timestamp, String user, int numContainers) {
dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index e4b7aa47a7a..bc6ec196e17 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -517,7 +517,7 @@ public class TestContainer {
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) {
dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ dispatcher.init(new Configuration());
localizerBus = mock(EventHandler.class);
launcherBus = mock(EventHandler.class);
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
index a1c6bb84793..07d8df1db6c 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
@@ -17,6 +17,15 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
-
import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
public class TestLocalizedResource {
@@ -62,7 +66,7 @@ public class TestLocalizedResource {
@SuppressWarnings("unchecked") // mocked generic
public void testNotification() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ dispatcher.init(new Configuration());
try {
dispatcher.start();
EventHandler containerBus = mock(EventHandler.class);
@@ -175,7 +179,7 @@ public class TestLocalizedResource {
@Test
public void testDirectLocalization() throws Exception {
DrainDispatcher dispatcher = new DrainDispatcher();
- dispatcher.init(null);
+ dispatcher.init(new Configuration());
try {
dispatcher.start();
LocalResource apiRsrc = createMockResource();
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index 9886d37c73b..84cd7f22b22 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -18,8 +18,23 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
-import java.net.InetSocketAddress;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyShort;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,7 +48,6 @@ import java.util.Set;
import junit.framework.Assert;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,6 +56,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
-import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-
import org.junit.Test;
-import static org.junit.Assert.*;
-
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
public class TestResourceLocalizationService {
@@ -98,11 +108,11 @@ public class TestResourceLocalizationService {
public void testLocalizationInit() throws Exception {
final Configuration conf = new Configuration();
AsyncDispatcher dispatcher = new AsyncDispatcher();
- dispatcher.init(null);
+ dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec));
- delService.init(null);
+ delService.init(new Configuration());
delService.start();
AbstractFileSystem spylfs =
@@ -371,7 +381,7 @@ public class TestResourceLocalizationService {
DeletionService delServiceReal = new DeletionService(exec);
DeletionService delService = spy(delServiceReal);
- delService.init(null);
+ delService.init(new Configuration());
delService.start();
ResourceLocalizationService rawService =
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index dc7d29cdcbc..bf01cef0058 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -131,6 +131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.conf = conf;
+ this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
this.rmDispatcher = createDispatcher();
addIfService(this.rmDispatcher);
@@ -265,6 +267,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private final BlockingQueue eventQueue =
new LinkedBlockingQueue();
private final Thread eventProcessor;
+ private volatile boolean stopped = false;
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
super(SchedulerEventDispatcher.class.getName());
@@ -285,7 +288,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
SchedulerEvent event;
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
event = eventQueue.take();
} catch (InterruptedException e) {
@@ -296,9 +299,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
try {
scheduler.handle(event);
} catch (Throwable t) {
- LOG.error("Error in handling event type " + event.getType()
+ LOG.fatal("Error in handling event type " + event.getType()
+ " to the scheduler", t);
- return; // TODO: Kill RM.
+ if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
+ LOG.info("Exiting, bbye..");
+ System.exit(-1);
+ }
}
}
}
@@ -306,6 +313,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public synchronized void stop() {
+ this.stopped = true;
this.eventProcessor.interrupt();
try {
this.eventProcessor.join();