MAPREDUCE-3983. TestTTResourceReporting can fail, and should just be deleted (Ravi Prakash via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d876c7800
commit
8c8dbba8a6
|
@ -220,6 +220,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-4060. Multiple SLF4J binding warning (Jason Lowe via bobby)
|
MAPREDUCE-4060. Multiple SLF4J binding warning (Jason Lowe via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-3983. TestTTResourceReporting can fail, and should just be
|
||||||
|
deleted (Ravi Prakash via bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1,364 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.mapreduce.SleepJob;
|
|
||||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
|
||||||
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
|
|
||||||
import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
|
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.After;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This test class tests the functionality related to configuring, reporting
|
|
||||||
* and computing memory related parameters in a Map/Reduce cluster.
|
|
||||||
*
|
|
||||||
* Each test sets up a {@link MiniMRCluster} with a locally defined
|
|
||||||
* {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates
|
|
||||||
* the memory related configuration is correctly computed and reported from
|
|
||||||
* the tasktracker in
|
|
||||||
* {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
|
|
||||||
*/
|
|
||||||
public class TestTTResourceReporting extends TestCase {
|
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TestTTResourceReporting.class);
|
|
||||||
|
|
||||||
private MiniMRCluster miniMRCluster;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Fake scheduler to test the proper reporting of memory values by TT
|
|
||||||
*/
|
|
||||||
public static class FakeTaskScheduler extends JobQueueTaskScheduler {
|
|
||||||
|
|
||||||
private boolean hasPassed = true;
|
|
||||||
private boolean hasDynamicValuePassed = true;
|
|
||||||
private String message;
|
|
||||||
|
|
||||||
public FakeTaskScheduler() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasTestPassed() {
|
|
||||||
return hasPassed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasDynamicTestPassed() {
|
|
||||||
return hasDynamicValuePassed;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFailureMessage() {
|
|
||||||
return message;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Task> assignTasks(TaskTracker taskTracker)
|
|
||||||
throws IOException {
|
|
||||||
TaskTrackerStatus status = taskTracker.getStatus();
|
|
||||||
long totalVirtualMemoryOnTT =
|
|
||||||
getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long totalPhysicalMemoryOnTT =
|
|
||||||
getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long mapSlotMemorySize =
|
|
||||||
getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long reduceSlotMemorySize =
|
|
||||||
getConf()
|
|
||||||
.getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long availableVirtualMemoryOnTT =
|
|
||||||
getConf().getLong("availableVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long availablePhysicalMemoryOnTT =
|
|
||||||
getConf().getLong("availablePmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
|
|
||||||
long cumulativeCpuTime =
|
|
||||||
getConf().getLong("cumulativeCpuTime", TaskTrackerStatus.UNAVAILABLE);
|
|
||||||
long cpuFrequency =
|
|
||||||
getConf().getLong("cpuFrequency", TaskTrackerStatus.UNAVAILABLE);
|
|
||||||
int numProcessors =
|
|
||||||
getConf().getInt("numProcessors", TaskTrackerStatus.UNAVAILABLE);
|
|
||||||
float cpuUsage =
|
|
||||||
getConf().getFloat("cpuUsage", TaskTrackerStatus.UNAVAILABLE);
|
|
||||||
|
|
||||||
long reportedTotalVirtualMemoryOnTT =
|
|
||||||
status.getResourceStatus().getTotalVirtualMemory();
|
|
||||||
long reportedTotalPhysicalMemoryOnTT =
|
|
||||||
status.getResourceStatus().getTotalPhysicalMemory();
|
|
||||||
long reportedMapSlotMemorySize =
|
|
||||||
status.getResourceStatus().getMapSlotMemorySizeOnTT();
|
|
||||||
long reportedReduceSlotMemorySize =
|
|
||||||
status.getResourceStatus().getReduceSlotMemorySizeOnTT();
|
|
||||||
long reportedAvailableVirtualMemoryOnTT =
|
|
||||||
status.getResourceStatus().getAvailabelVirtualMemory();
|
|
||||||
long reportedAvailablePhysicalMemoryOnTT =
|
|
||||||
status.getResourceStatus().getAvailablePhysicalMemory();
|
|
||||||
long reportedCumulativeCpuTime =
|
|
||||||
status.getResourceStatus().getCumulativeCpuTime();
|
|
||||||
long reportedCpuFrequency = status.getResourceStatus().getCpuFrequency();
|
|
||||||
int reportedNumProcessors = status.getResourceStatus().getNumProcessors();
|
|
||||||
float reportedCpuUsage = status.getResourceStatus().getCpuUsage();
|
|
||||||
|
|
||||||
message =
|
|
||||||
"expected values : "
|
|
||||||
+ "(totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
|
|
||||||
+ "availableVirtualMemoryOnTT, availablePhysicalMemoryOnTT, "
|
|
||||||
+ "mapSlotMemSize, reduceSlotMemorySize, cumulativeCpuTime, "
|
|
||||||
+ "cpuFrequency, numProcessors, cpuUsage) = ("
|
|
||||||
+ totalVirtualMemoryOnTT + ", "
|
|
||||||
+ totalPhysicalMemoryOnTT + ","
|
|
||||||
+ availableVirtualMemoryOnTT + ", "
|
|
||||||
+ availablePhysicalMemoryOnTT + ","
|
|
||||||
+ mapSlotMemorySize + ","
|
|
||||||
+ reduceSlotMemorySize + ","
|
|
||||||
+ cumulativeCpuTime + ","
|
|
||||||
+ cpuFrequency + ","
|
|
||||||
+ numProcessors + ","
|
|
||||||
+ cpuUsage
|
|
||||||
+")";
|
|
||||||
message +=
|
|
||||||
"\nreported values : "
|
|
||||||
+ "(totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
|
|
||||||
+ "availableVirtualMemoryOnTT, availablePhysicalMemoryOnTT, "
|
|
||||||
+ "reportedMapSlotMemorySize, reportedReduceSlotMemorySize, "
|
|
||||||
+ "reportedCumulativeCpuTime, reportedCpuFrequency, "
|
|
||||||
+ "reportedNumProcessors, cpuUsage) = ("
|
|
||||||
+ reportedTotalVirtualMemoryOnTT + ", "
|
|
||||||
+ reportedTotalPhysicalMemoryOnTT + ","
|
|
||||||
+ reportedAvailableVirtualMemoryOnTT + ", "
|
|
||||||
+ reportedAvailablePhysicalMemoryOnTT + ","
|
|
||||||
+ reportedMapSlotMemorySize + ","
|
|
||||||
+ reportedReduceSlotMemorySize + ","
|
|
||||||
+ reportedCumulativeCpuTime + ","
|
|
||||||
+ reportedCpuFrequency + ","
|
|
||||||
+ reportedNumProcessors + ","
|
|
||||||
+ reportedCpuUsage
|
|
||||||
+ ")";
|
|
||||||
LOG.info(message);
|
|
||||||
hasDynamicValuePassed = true;
|
|
||||||
// Check task resource status in task reports
|
|
||||||
for (TaskStatus taskStatus : status.getTaskReports()) {
|
|
||||||
Counters counters = taskStatus.getCounters();
|
|
||||||
// This should be zero because the initial CPU time is subtracted.
|
|
||||||
long procCumulativeCpuTime = 0;
|
|
||||||
long procVirtualMemorySize =
|
|
||||||
getConf().getLong("procVirtualMemorySize", -1);
|
|
||||||
long procPhysicalMemorySize =
|
|
||||||
getConf().getLong("procPhysicalMemorySize", -1);
|
|
||||||
long reportedProcCumulativeCpuTime =
|
|
||||||
counters.findCounter(TaskCounter.CPU_MILLISECONDS).getValue();
|
|
||||||
long reportedProcVirtualMemorySize =
|
|
||||||
counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).getValue();
|
|
||||||
long reportedProcPhysicalMemorySize =
|
|
||||||
counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue();
|
|
||||||
String procMessage =
|
|
||||||
"expected values : "
|
|
||||||
+ "(procCumulativeCpuTime, procVirtualMemorySize,"
|
|
||||||
+ " procPhysicalMemorySize) = ("
|
|
||||||
+ procCumulativeCpuTime + ", "
|
|
||||||
+ procVirtualMemorySize + ", "
|
|
||||||
+ procPhysicalMemorySize + ")";
|
|
||||||
procMessage +=
|
|
||||||
"\nreported values : "
|
|
||||||
+ "(procCumulativeCpuTime, procVirtualMemorySize,"
|
|
||||||
+ " procPhysicalMemorySize) = ("
|
|
||||||
+ reportedProcCumulativeCpuTime + ", "
|
|
||||||
+ reportedProcVirtualMemorySize + ", "
|
|
||||||
+ reportedProcPhysicalMemorySize + ")";
|
|
||||||
LOG.info(procMessage);
|
|
||||||
message += "\n" + procMessage;
|
|
||||||
if (procCumulativeCpuTime != reportedProcCumulativeCpuTime ||
|
|
||||||
procVirtualMemorySize != reportedProcVirtualMemorySize ||
|
|
||||||
procPhysicalMemorySize != reportedProcPhysicalMemorySize) {
|
|
||||||
hasDynamicValuePassed = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hasPassed = true;
|
|
||||||
if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
|
|
||||||
|| totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
|
|
||||||
|| mapSlotMemorySize != reportedMapSlotMemorySize
|
|
||||||
|| reduceSlotMemorySize != reportedReduceSlotMemorySize
|
|
||||||
|| numProcessors != reportedNumProcessors) {
|
|
||||||
hasPassed = false;
|
|
||||||
}
|
|
||||||
// These values changes every moment on the node so it can only be
|
|
||||||
// tested by DummyMemoryCalculatorPlugin. Need to check them separately
|
|
||||||
if (availableVirtualMemoryOnTT != reportedAvailableVirtualMemoryOnTT
|
|
||||||
|| availablePhysicalMemoryOnTT != reportedAvailablePhysicalMemoryOnTT
|
|
||||||
|| cumulativeCpuTime != reportedCumulativeCpuTime
|
|
||||||
|| cpuFrequency != reportedCpuFrequency
|
|
||||||
|| cpuUsage != reportedCpuUsage) {
|
|
||||||
hasDynamicValuePassed = false;
|
|
||||||
}
|
|
||||||
return super.assignTasks(taskTracker);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that verifies default values are configured and reported correctly.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testDefaultResourceValues()
|
|
||||||
throws Exception {
|
|
||||||
JobConf conf = new JobConf();
|
|
||||||
try {
|
|
||||||
// Memory values are disabled by default.
|
|
||||||
conf.setClass(
|
|
||||||
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
|
|
||||||
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
|
|
||||||
setUpCluster(conf);
|
|
||||||
JobConf jobConf = miniMRCluster.createJobConf();
|
|
||||||
jobConf.setClass(
|
|
||||||
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
|
|
||||||
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
|
|
||||||
runSleepJob(jobConf);
|
|
||||||
verifyTestResults();
|
|
||||||
} finally {
|
|
||||||
tearDownCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that verifies that configured values are reported correctly.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testConfiguredResourceValues()
|
|
||||||
throws Exception {
|
|
||||||
JobConf conf = new JobConf();
|
|
||||||
conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong("mapSlotMemorySize", 1 * 512L);
|
|
||||||
conf.setLong("reduceSlotMemorySize", 1 * 1024L);
|
|
||||||
conf.setLong("availableVmemOnTT", 4 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong("availablePmemOnTT", 2 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong("cumulativeCpuTime", 10000L);
|
|
||||||
conf.setLong("cpuFrequency", 2000000L);
|
|
||||||
conf.setInt("numProcessors", 8);
|
|
||||||
conf.setFloat("cpuUsage", 15.5F);
|
|
||||||
conf.setLong("procCumulativeCpuTime", 1000L);
|
|
||||||
conf.setLong("procVirtualMemorySize", 2 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong("procPhysicalMemorySize", 1024 * 1024 * 1024L);
|
|
||||||
|
|
||||||
conf.setClass(
|
|
||||||
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
|
|
||||||
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
|
|
||||||
conf.setLong(DummyResourceCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
|
|
||||||
4 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong(DummyResourceCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
|
|
||||||
2 * 1024 * 1024 * 1024L);
|
|
||||||
conf.setLong(MRConfig.MAPMEMORY_MB, 512L);
|
|
||||||
conf.setLong(MRConfig.REDUCEMEMORY_MB, 1024L);
|
|
||||||
conf.setLong(DummyResourceCalculatorPlugin.CUMULATIVE_CPU_TIME, 10000L);
|
|
||||||
conf.setLong(DummyResourceCalculatorPlugin.CPU_FREQUENCY, 2000000L);
|
|
||||||
conf.setInt(DummyResourceCalculatorPlugin.NUM_PROCESSORS, 8);
|
|
||||||
conf.setFloat(DummyResourceCalculatorPlugin.CPU_USAGE, 15.5F);
|
|
||||||
try {
|
|
||||||
setUpCluster(conf);
|
|
||||||
JobConf jobConf = miniMRCluster.createJobConf();
|
|
||||||
jobConf.setMemoryForMapTask(1 * 1024L);
|
|
||||||
jobConf.setMemoryForReduceTask(2 * 1024L);
|
|
||||||
jobConf.setClass(
|
|
||||||
org.apache.hadoop.mapreduce.server.tasktracker.TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
|
|
||||||
DummyResourceCalculatorPlugin.class, ResourceCalculatorPlugin.class);
|
|
||||||
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_CUMULATIVE_CPU_TIME, 1000L);
|
|
||||||
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_VMEM_TESTING_PROPERTY,
|
|
||||||
2 * 1024 * 1024 * 1024L);
|
|
||||||
jobConf.setLong(DummyResourceCalculatorPlugin.PROC_PMEM_TESTING_PROPERTY,
|
|
||||||
1024 * 1024 * 1024L);
|
|
||||||
runSleepJob(jobConf);
|
|
||||||
verifyTestResults();
|
|
||||||
} finally {
|
|
||||||
tearDownCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test that verifies that total memory values are calculated and reported
|
|
||||||
* correctly.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testResourceValuesOnLinux()
|
|
||||||
throws Exception {
|
|
||||||
if (!System.getProperty("os.name").startsWith("Linux")) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
JobConf conf = new JobConf();
|
|
||||||
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
|
|
||||||
// In this case, we only check these three fields because they are static
|
|
||||||
conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
|
|
||||||
conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
|
|
||||||
conf.setLong("numProcessors", plugin.getNumProcessors());
|
|
||||||
|
|
||||||
try {
|
|
||||||
setUpCluster(conf);
|
|
||||||
runSleepJob(miniMRCluster.createJobConf());
|
|
||||||
verifyTestResults(true);
|
|
||||||
} finally {
|
|
||||||
tearDownCluster();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setUpCluster(JobConf conf)
|
|
||||||
throws Exception {
|
|
||||||
conf.setClass(JTConfig.JT_TASK_SCHEDULER,
|
|
||||||
TestTTResourceReporting.FakeTaskScheduler.class, TaskScheduler.class);
|
|
||||||
conf.set(JTConfig.JT_IPC_HANDLER_COUNT, "1");
|
|
||||||
miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runSleepJob(JobConf conf) throws Exception {
|
|
||||||
String[] args = { "-m", "1", "-r", "1",
|
|
||||||
"-mt", "10", "-rt", "10" };
|
|
||||||
ToolRunner.run(conf, new SleepJob(), args);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyTestResults() {
|
|
||||||
verifyTestResults(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyTestResults(boolean excludeDynamic) {
|
|
||||||
FakeTaskScheduler scheduler =
|
|
||||||
(FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
|
|
||||||
getJobTracker().getTaskScheduler();
|
|
||||||
assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
|
|
||||||
if (!excludeDynamic) {
|
|
||||||
assertTrue(scheduler.getFailureMessage(),
|
|
||||||
scheduler.hasDynamicTestPassed());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
private void tearDownCluster() {
|
|
||||||
if (miniMRCluster != null) {
|
|
||||||
miniMRCluster.shutdown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue