YARN-1726. ResourceSchedulerWrapper broken due to AbstractYarnScheduler. (Wei Yan via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1613550 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-26 01:55:03 +00:00
parent 5d0172fdf5
commit c2b0f62155
9 changed files with 323 additions and 94 deletions

View File

@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -133,8 +135,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
* register with RM * register with RM
*/ */
@Override @Override
public void firstStep() public void firstStep() throws Exception {
throws YarnException, IOException, InterruptedException {
simulateStartTimeMS = System.currentTimeMillis() - simulateStartTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS(); SLSRunner.getRunner().getStartTimeMS();
@ -149,8 +150,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
} }
@Override @Override
public void middleStep() public void middleStep() throws Exception {
throws InterruptedException, YarnException, IOException {
// process responses in the queue // process responses in the queue
processResponseQueue(); processResponseQueue();
@ -162,7 +162,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
} }
@Override @Override
public void lastStep() { public void lastStep() throws Exception {
LOG.info(MessageFormat.format("Application {0} is shutting down.", appId)); LOG.info(MessageFormat.format("Application {0} is shutting down.", appId));
// unregister tracking // unregister tracking
if (isTracked) { if (isTracked) {
@ -173,11 +173,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
.newRecordInstance(FinishApplicationMasterRequest.class); .newRecordInstance(FinishApplicationMasterRequest.class);
finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED); finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
try {
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString()); UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token = Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
.getRMAppAttempt(appAttemptId).getAMRMToken(); .getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier()); ugi.addTokenIdentifier(token.decodeIdentifier());
ugi.doAs(new PrivilegedExceptionAction<Object>() { ugi.doAs(new PrivilegedExceptionAction<Object>() {
@ -188,11 +186,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
return null; return null;
} }
}); });
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
simulateFinishTimeMS = System.currentTimeMillis() - simulateFinishTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS(); SLSRunner.getRunner().getStartTimeMS();
@ -230,11 +223,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
return createAllocateRequest(ask, new ArrayList<ContainerId>()); return createAllocateRequest(ask, new ArrayList<ContainerId>());
} }
protected abstract void processResponseQueue() protected abstract void processResponseQueue() throws Exception;
throws InterruptedException, YarnException, IOException;
protected abstract void sendContainerRequest() protected abstract void sendContainerRequest() throws Exception;
throws YarnException, IOException, InterruptedException;
protected abstract void checkStop(); protected abstract void checkStop();
@ -280,11 +271,18 @@ public abstract class AMSimulator extends TaskRunner.Task {
// waiting until application ACCEPTED // waiting until application ACCEPTED
RMApp app = rm.getRMContext().getRMApps().get(appId); RMApp app = rm.getRMContext().getRMApps().get(appId);
while(app.getState() != RMAppState.ACCEPTED) { while(app.getState() != RMAppState.ACCEPTED) {
Thread.sleep(50); Thread.sleep(10);
} }
appAttemptId = rm.getRMContext().getRMApps().get(appId) // Waiting until application attempt reach LAUNCHED
// "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt().getAppAttemptId(); .getCurrentAppAttempt().getAppAttemptId();
RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt();
while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
Thread.sleep(10);
}
} }
private void registerAM() private void registerAM()
@ -298,8 +296,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
UserGroupInformation ugi = UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString()); UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token = Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
.getRMAppAttempt(appAttemptId).getAMRMToken(); .getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier()); ugi.addTokenIdentifier(token.decodeIdentifier());

View File

@ -145,8 +145,7 @@ public class MRAMSimulator extends AMSimulator {
} }
@Override @Override
public void firstStep() public void firstStep() throws Exception {
throws YarnException, IOException, InterruptedException {
super.firstStep(); super.firstStep();
requestAMContainer(); requestAMContainer();
@ -390,7 +389,7 @@ public class MRAMSimulator extends AMSimulator {
} }
@Override @Override
public void lastStep() { public void lastStep() throws Exception {
super.lastStep(); super.lastStep();
// clear data structures // clear data structures

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -107,12 +108,12 @@ public class NMSimulator extends TaskRunner.Task {
} }
@Override @Override
public void firstStep() throws YarnException, IOException { public void firstStep() {
// do nothing // do nothing
} }
@Override @Override
public void middleStep() { public void middleStep() throws Exception {
// we check the lifetime for each running containers // we check the lifetime for each running containers
ContainerSimulator cs = null; ContainerSimulator cs = null;
synchronized(completedContainerList) { synchronized(completedContainerList) {
@ -136,7 +137,6 @@ public class NMSimulator extends TaskRunner.Task {
ns.setResponseId(RESPONSE_ID ++); ns.setResponseId(RESPONSE_ID ++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
beatRequest.setNodeStatus(ns); beatRequest.setNodeStatus(ns);
try {
NodeHeartbeatResponse beatResponse = NodeHeartbeatResponse beatResponse =
rm.getResourceTrackerService().nodeHeartbeat(beatRequest); rm.getResourceTrackerService().nodeHeartbeat(beatRequest);
if (! beatResponse.getContainersToCleanup().isEmpty()) { if (! beatResponse.getContainersToCleanup().isEmpty()) {
@ -163,11 +163,6 @@ public class NMSimulator extends TaskRunner.Task {
if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) { if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) {
lastStep(); lastStep();
} }
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
} }
@Override @Override
@ -262,4 +257,19 @@ public class NMSimulator extends TaskRunner.Task {
completedContainerList.add(containerId); completedContainerList.add(containerId);
} }
} }
@VisibleForTesting
Map<ContainerId, ContainerSimulator> getRunningContainers() {
return runningContainers;
}
@VisibleForTesting
List<ContainerId> getAMContainers() {
return amContainerList;
}
@VisibleForTesting
List<ContainerId> getCompletedContainers() {
return completedContainerList;
}
} }

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@ -101,7 +102,6 @@ public class ResourceSchedulerWrapper
private static final String EOL = System.getProperty("line.separator"); private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60; private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool; private ScheduledExecutorService pool;
private RMContext rmContext;
// counters for scheduler allocate/handle operations // counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter; private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter; private Counter schedulerHandleCounter;
@ -577,7 +577,7 @@ public class ResourceSchedulerWrapper
new Gauge<Integer>() { new Gauge<Integer>() {
@Override @Override
public Integer getValue() { public Integer getValue() {
if(scheduler == null || scheduler.getRootQueueMetrics() == null) { if (scheduler == null || scheduler.getRootQueueMetrics() == null) {
return 0; return 0;
} else { } else {
return scheduler.getRootQueueMetrics().getAppsRunning(); return scheduler.getRootQueueMetrics().getAppsRunning();
@ -724,7 +724,7 @@ public class ResourceSchedulerWrapper
public void addAMRuntime(ApplicationId appId, public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS, long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) { long simulateStartTimeMS, long simulateEndTimeMS) {
if (metricsON) {
try { try {
// write job runtime information // write job runtime information
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -737,6 +737,7 @@ public class ResourceSchedulerWrapper
e.printStackTrace(); e.printStackTrace();
} }
} }
}
private void updateQueueMetrics(String queue, private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) { int releasedMemory, int releasedVCores) {
@ -920,5 +921,18 @@ public class ResourceSchedulerWrapper
public Resource getClusterResource() { public Resource getClusterResource() {
return null; return null;
} }
@Override
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
return new ArrayList<Container>();
}
@Override
public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
getSchedulerApplications() {
return new HashMap<ApplicationId,
SchedulerApplication<SchedulerApplicationAttempt>>();
}
} }

View File

@ -99,12 +99,10 @@ public class TaskRunner {
} else { } else {
lastStep(); lastStep();
} }
} catch (YarnException e) { } catch (Exception e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Thread.getDefaultUncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(), e);
} }
} }
@ -124,13 +122,11 @@ public class TaskRunner {
} }
public abstract void firstStep() public abstract void firstStep() throws Exception;
throws YarnException, IOException, InterruptedException;
public abstract void middleStep() public abstract void middleStep() throws Exception;
throws YarnException, InterruptedException, IOException;
public abstract void lastStep() throws YarnException; public abstract void lastStep() throws Exception;
public void setEndTime(long et) { public void setEndTime(long et) {
endTime = et; endTime = et;

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.sls; package org.apache.hadoop.yarn.sls;
import org.apache.commons.io.FileUtils; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID; import java.util.UUID;
public class TestSLSRunner { public class TestSLSRunner {
@ -30,6 +33,15 @@ public class TestSLSRunner {
@SuppressWarnings("all") @SuppressWarnings("all")
public void testSimulatorRunning() throws Exception { public void testSimulatorRunning() throws Exception {
File tempDir = new File("target", UUID.randomUUID().toString()); File tempDir = new File("target", UUID.randomUUID().toString());
final List<Throwable> exceptionList =
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionList.add(e);
}
});
// start the simulator // start the simulator
File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/"); File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/");
@ -38,8 +50,20 @@ public class TestSLSRunner {
"-output", slsOutputDir.getAbsolutePath()}; "-output", slsOutputDir.getAbsolutePath()};
SLSRunner.main(args); SLSRunner.main(args);
// wait for 45 seconds before stop // wait for 20 seconds before stop
Thread.sleep(45 * 1000); int count = 20;
while (count >= 0) {
Thread.sleep(1000);
if (! exceptionList.isEmpty()) {
SLSRunner.getRunner().stop();
Assert.fail("TestSLSRunner catched exception from child thread " +
"(TaskRunner.Task): " + exceptionList.get(0).getMessage());
break;
}
count--;
}
SLSRunner.getRunner().stop(); SLSRunner.getRunner().stop();
} }

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.appmaster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestAMSimulator {
private ResourceManager rm;
private YarnConfiguration conf;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
rm = new ResourceManager();
rm.init(conf);
rm.start();
}
class MockAMSimulator extends AMSimulator {
@Override
protected void processResponseQueue()
throws InterruptedException, YarnException, IOException {
}
@Override
protected void sendContainerRequest()
throws YarnException, IOException, InterruptedException {
}
@Override
protected void checkStop() {
}
}
@Test
public void testAMSimulator() throws Exception {
// Register one app
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containers = new ArrayList<ContainerSimulator>();
app.init(1, 1000, containers, rm, null, 0, 1000000l, "user1", "default",
false, "app1");
app.firstStep();
Assert.assertEquals(1, rm.getRMContext().getRMApps().size());
Assert.assertNotNull(rm.getRMContext().getRMApps().get(app.appId));
// Finish this app
app.lastStep();
}
@After
public void tearDown() {
rm.stop();
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.nodemanager;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNMSimulator {
private final int GB = 1024;
private ResourceManager rm;
private YarnConfiguration conf;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
rm = new ResourceManager();
rm.init(conf);
rm.start();
}
@Test
public void testNMSimulator() throws Exception {
// Register one node
NMSimulator node1 = new NMSimulator();
node1.init("rack1/node1", GB * 10, 10, 0, 1000, rm);
node1.middleStep();
Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
Assert.assertEquals(GB * 10,
rm.getResourceScheduler().getRootQueueMetrics().getAvailableMB());
Assert.assertEquals(10,
rm.getResourceScheduler().getRootQueueMetrics()
.getAvailableVirtualCores());
// Allocate one container on node1
ContainerId cId1 = newContainerId(1, 1, 1);
Container container1 = Container.newInstance(cId1, null, null,
Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container1, 100000l);
Assert.assertTrue("Node1 should have one running container.",
node1.getRunningContainers().containsKey(cId1));
// Allocate one AM container on node1
ContainerId cId2 = newContainerId(2, 1, 1);
Container container2 = Container.newInstance(cId2, null, null,
Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container2, -1l);
Assert.assertTrue("Node1 should have one running AM container",
node1.getAMContainers().contains(cId2));
// Remove containers
node1.cleanupContainer(cId1);
Assert.assertTrue("Container1 should be removed from Node1.",
node1.getCompletedContainers().contains(cId1));
node1.cleanupContainer(cId2);
Assert.assertFalse("Container2 should be removed from Node1.",
node1.getAMContainers().contains(cId2));
}
private ContainerId newContainerId(int appId, int appAttemptId, int cId) {
return BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(System.currentTimeMillis(), appId),
appAttemptId), cId);
}
@After
public void tearDown() throws Exception {
rm.stop();
}
}

View File

@ -431,6 +431,9 @@ Release 2.5.0 - UNRELEASED
YARN-2335. Annotate all hadoop-sls APIs as @Private. (Wei Yan via kasha) YARN-2335. Annotate all hadoop-sls APIs as @Private. (Wei Yan via kasha)
YARN-1726. ResourceSchedulerWrapper broken due to AbstractYarnScheduler.
(Wei Yan via kasha)
Release 2.4.1 - 2014-06-23 Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES