MAPREDUCE-5608. Replace and deprecate mapred.tasktracker.indexcache.mb (#5014)
Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com> Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
04b31d7ecf
commit
a48e8c9beb
|
@ -208,7 +208,8 @@ The following table lists the configuration property names that are deprecated i
|
||||||
| mapred.task.profile.params | mapreduce.task.profile.params |
|
| mapred.task.profile.params | mapreduce.task.profile.params |
|
||||||
| mapred.task.profile.reduces | mapreduce.task.profile.reduces |
|
| mapred.task.profile.reduces | mapreduce.task.profile.reduces |
|
||||||
| mapred.task.timeout | mapreduce.task.timeout |
|
| mapred.task.timeout | mapreduce.task.timeout |
|
||||||
| mapred.tasktracker.indexcache.mb | mapreduce.tasktracker.indexcache.mb |
|
| mapred.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
|
||||||
|
| mapreduce.tasktracker.indexcache.mb | mapreduce.reduce.shuffle.indexcache.mb |
|
||||||
| mapred.tasktracker.map.tasks.maximum | mapreduce.tasktracker.map.tasks.maximum |
|
| mapred.tasktracker.map.tasks.maximum | mapreduce.tasktracker.map.tasks.maximum |
|
||||||
| mapred.tasktracker.memory\_calculator\_plugin | mapreduce.tasktracker.resourcecalculatorplugin |
|
| mapred.tasktracker.memory\_calculator\_plugin | mapreduce.tasktracker.resourcecalculatorplugin |
|
||||||
| mapred.tasktracker.memorycalculatorplugin | mapreduce.tasktracker.resourcecalculatorplugin |
|
| mapred.tasktracker.memorycalculatorplugin | mapreduce.tasktracker.resourcecalculatorplugin |
|
||||||
|
|
|
@ -23,7 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ class IndexCache {
|
||||||
public IndexCache(JobConf conf) {
|
public IndexCache(JobConf conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
totalMemoryAllowed =
|
totalMemoryAllowed =
|
||||||
conf.getInt(TTConfig.TT_INDEX_CACHE, 10) * 1024 * 1024;
|
conf.getInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10) * 1024 * 1024;
|
||||||
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
|
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -577,6 +577,8 @@ public interface MRJobConfig {
|
||||||
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
|
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
|
||||||
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
|
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
|
||||||
|
|
||||||
|
public static final String SHUFFLE_INDEX_CACHE = "mapreduce.reduce.shuffle.indexcache.mb";
|
||||||
|
|
||||||
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
|
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
|
||||||
|
|
||||||
public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
|
public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
|
||||||
|
|
|
@ -29,6 +29,12 @@ import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public interface TTConfig extends MRConfig {
|
public interface TTConfig extends MRConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use
|
||||||
|
* {@link org.apache.hadoop.mapreduce.MRJobConfig#SHUFFLE_INDEX_CACHE}
|
||||||
|
* instead
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public static final String TT_INDEX_CACHE =
|
public static final String TT_INDEX_CACHE =
|
||||||
"mapreduce.tasktracker.indexcache.mb";
|
"mapreduce.tasktracker.indexcache.mb";
|
||||||
public static final String TT_MAP_SLOTS =
|
public static final String TT_MAP_SLOTS =
|
||||||
|
|
|
@ -80,8 +80,6 @@ public class ConfigUtil {
|
||||||
JTConfig.JT_TASKCACHE_LEVELS),
|
JTConfig.JT_TASKCACHE_LEVELS),
|
||||||
new DeprecationDelta("mapred.job.tracker.retire.jobs",
|
new DeprecationDelta("mapred.job.tracker.retire.jobs",
|
||||||
JTConfig.JT_RETIREJOBS),
|
JTConfig.JT_RETIREJOBS),
|
||||||
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
|
|
||||||
TTConfig.TT_INDEX_CACHE),
|
|
||||||
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
|
new DeprecationDelta("mapred.tasktracker.map.tasks.maximum",
|
||||||
TTConfig.TT_MAP_SLOTS),
|
TTConfig.TT_MAP_SLOTS),
|
||||||
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
|
new DeprecationDelta("mapred.tasktracker.memory_calculator_plugin",
|
||||||
|
@ -290,6 +288,10 @@ public class ConfigUtil {
|
||||||
MRJobConfig.REDUCE_LOG_LEVEL),
|
MRJobConfig.REDUCE_LOG_LEVEL),
|
||||||
new DeprecationDelta("mapreduce.job.counters.limit",
|
new DeprecationDelta("mapreduce.job.counters.limit",
|
||||||
MRJobConfig.COUNTERS_MAX_KEY),
|
MRJobConfig.COUNTERS_MAX_KEY),
|
||||||
|
new DeprecationDelta("mapred.tasktracker.indexcache.mb",
|
||||||
|
MRJobConfig.SHUFFLE_INDEX_CACHE),
|
||||||
|
new DeprecationDelta("mapreduce.tasktracker.indexcache.mb",
|
||||||
|
MRJobConfig.SHUFFLE_INDEX_CACHE),
|
||||||
new DeprecationDelta("jobclient.completion.poll.interval",
|
new DeprecationDelta("jobclient.completion.poll.interval",
|
||||||
Job.COMPLETION_POLL_INTERVAL_KEY),
|
Job.COMPLETION_POLL_INTERVAL_KEY),
|
||||||
new DeprecationDelta("jobclient.progress.monitor.poll.interval",
|
new DeprecationDelta("jobclient.progress.monitor.poll.interval",
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -56,7 +56,7 @@ public class TestIndexCache {
|
||||||
r.setSeed(seed);
|
r.setSeed(seed);
|
||||||
System.out.println("seed: " + seed);
|
System.out.println("seed: " + seed);
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
|
||||||
final int partsPerMap = 1000;
|
final int partsPerMap = 1000;
|
||||||
final int bytesPerFile = partsPerMap * 24;
|
final int bytesPerFile = partsPerMap * 24;
|
||||||
IndexCache cache = new IndexCache(conf);
|
IndexCache cache = new IndexCache(conf);
|
||||||
|
@ -127,7 +127,7 @@ public class TestIndexCache {
|
||||||
public void testBadIndex() throws Exception {
|
public void testBadIndex() throws Exception {
|
||||||
final int parts = 30;
|
final int parts = 30;
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
|
||||||
IndexCache cache = new IndexCache(conf);
|
IndexCache cache = new IndexCache(conf);
|
||||||
|
|
||||||
Path f = new Path(p, "badindex");
|
Path f = new Path(p, "badindex");
|
||||||
|
@ -159,7 +159,7 @@ public class TestIndexCache {
|
||||||
@Test
|
@Test
|
||||||
public void testInvalidReduceNumberOrLength() throws Exception {
|
public void testInvalidReduceNumberOrLength() throws Exception {
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
|
||||||
final int partsPerMap = 1000;
|
final int partsPerMap = 1000;
|
||||||
final int bytesPerFile = partsPerMap * 24;
|
final int bytesPerFile = partsPerMap * 24;
|
||||||
IndexCache cache = new IndexCache(conf);
|
IndexCache cache = new IndexCache(conf);
|
||||||
|
@ -205,7 +205,7 @@ public class TestIndexCache {
|
||||||
// fails with probability of 100% on code before MAPREDUCE-2541,
|
// fails with probability of 100% on code before MAPREDUCE-2541,
|
||||||
// so it is repeatable in practice.
|
// so it is repeatable in practice.
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
|
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 10);
|
||||||
// Make a big file so removeMapThread almost surely runs faster than
|
// Make a big file so removeMapThread almost surely runs faster than
|
||||||
// getInfoThread
|
// getInfoThread
|
||||||
final int partsPerMap = 100000;
|
final int partsPerMap = 100000;
|
||||||
|
@ -251,7 +251,7 @@ public class TestIndexCache {
|
||||||
@Test
|
@Test
|
||||||
public void testCreateRace() throws Exception {
|
public void testCreateRace() throws Exception {
|
||||||
fs.delete(p, true);
|
fs.delete(p, true);
|
||||||
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
conf.setInt(MRJobConfig.SHUFFLE_INDEX_CACHE, 1);
|
||||||
final int partsPerMap = 1000;
|
final int partsPerMap = 1000;
|
||||||
final int bytesPerFile = partsPerMap * 24;
|
final int bytesPerFile = partsPerMap * 24;
|
||||||
final IndexCache cache = new IndexCache(conf);
|
final IndexCache cache = new IndexCache(conf);
|
||||||
|
|
Loading…
Reference in New Issue