diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c98ce087b80..3c1aed5301f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -200,6 +200,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10622. Shell.runCommand can deadlock (Gera Shegalov via jlowe) + HADOOP-10686. Writables are not always configured. + (Abraham Elmahrek via kasha) + BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS HADOOP-10520. Extended attributes definition and FileSystem APIs for diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java index 1a571d66593..84c9dcc0877 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java @@ -256,7 +256,7 @@ public class MapFile { } else { keyClass= (Class) keyClassOption.getValue(); - this.comparator = WritableComparator.get(keyClass); + this.comparator = WritableComparator.get(keyClass, conf); } this.lastKey = comparator.newKey(); FileSystem fs = dirName.getFileSystem(conf); @@ -428,12 +428,13 @@ public class MapFile { this.data = createDataFileReader(dataFile, conf, options); this.firstPosition = data.getPosition(); - if (comparator == null) - this.comparator = - WritableComparator.get(data.getKeyClass(). - asSubclass(WritableComparable.class)); - else + if (comparator == null) { + Class cls; + cls = data.getKeyClass().asSubclass(WritableComparable.class); + this.comparator = WritableComparator.get(cls, conf); + } else { this.comparator = comparator; + } // open the index SequenceFile.Reader.Option[] indexOptions = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 3c8e43671f6..19036cf8412 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -2676,7 +2676,7 @@ public class SequenceFile { /** Sort and merge files containing the named classes. */ public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf) { - this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf); + this(fs, WritableComparator.get(keyClass, conf), keyClass, valClass, conf); } /** Sort and merge using an arbitrary {@link RawComparator}. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java index 068ca9d40e5..118cce75136 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SetFile.java @@ -52,7 +52,7 @@ public class SetFile extends MapFile { Class keyClass, SequenceFile.CompressionType compress) throws IOException { - this(conf, fs, dirName, WritableComparator.get(keyClass), compress); + this(conf, fs, dirName, WritableComparator.get(keyClass, conf), compress); } /** Create a set naming the element comparator and compression type. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java index d2cbe3bd621..b2738ffa803 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/WritableComparator.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; /** A Comparator for {@link WritableComparable}s. @@ -37,13 +39,21 @@ import org.apache.hadoop.util.ReflectionUtils; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class WritableComparator implements RawComparator { +public class WritableComparator implements RawComparator, Configurable { private static final ConcurrentHashMap comparators = new ConcurrentHashMap(); // registry - /** Get a comparator for a {@link WritableComparable} implementation. */ + private Configuration conf; + + /** For backwards compatibility. **/ public static WritableComparator get(Class c) { + return get(c, null); + } + + /** Get a comparator for a {@link WritableComparable} implementation. */ + public static WritableComparator get( + Class c, Configuration conf) { WritableComparator comparator = comparators.get(c); if (comparator == null) { // force the static initializers to run @@ -52,12 +62,24 @@ public class WritableComparator implements RawComparator { comparator = comparators.get(c); // if not, use the generic one if (comparator == null) { - comparator = new WritableComparator(c, true); + comparator = new WritableComparator(c, conf, true); } } + // Newly passed Configuration objects should be used. + ReflectionUtils.setConf(comparator, conf); return comparator; } + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + /** * Force initialization of the static members. * As of Java 5, referencing a class doesn't force it to initialize. Since @@ -91,12 +113,19 @@ public class WritableComparator implements RawComparator { /** Construct for a {@link WritableComparable} implementation. */ protected WritableComparator(Class keyClass) { - this(keyClass, false); + this(keyClass, null, false); } protected WritableComparator(Class keyClass, boolean createInstances) { + this(keyClass, null, createInstances); + } + + protected WritableComparator(Class keyClass, + Configuration conf, + boolean createInstances) { this.keyClass = keyClass; + this.conf = (conf != null) ? conf : new Configuration(); if (createInstances) { key1 = newKey(); key2 = newKey(); @@ -112,7 +141,7 @@ public class WritableComparator implements RawComparator { /** Construct a new {@link WritableComparable} instance. */ public WritableComparable newKey() { - return ReflectionUtils.newInstance(keyClass, null); + return ReflectionUtils.newInstance(keyClass, conf); } /** Optimization hook. Override this to make SequenceFile.Sorter's scream. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java index 971e237d50b..41dfb7a73d4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWritable.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Random; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; @@ -30,6 +31,11 @@ import junit.framework.TestCase; /** Unit tests for Writable. */ public class TestWritable extends TestCase { +private static final String TEST_CONFIG_PARAM = "frob.test"; +private static final String TEST_CONFIG_VALUE = "test"; +private static final String TEST_WRITABLE_CONFIG_PARAM = "test.writable"; +private static final String TEST_WRITABLE_CONFIG_VALUE = TEST_CONFIG_VALUE; + public TestWritable(String name) { super(name); } /** Example class used in test cases below. */ @@ -64,6 +70,25 @@ public class TestWritable extends TestCase { } } + public static class SimpleWritableComparable extends SimpleWritable + implements WritableComparable, Configurable { + private Configuration conf; + + public SimpleWritableComparable() {} + + public void setConf(Configuration conf) { + this.conf = conf; + } + + public Configuration getConf() { + return this.conf; + } + + public int compareTo(SimpleWritableComparable o) { + return this.state - o.state; + } + } + /** Test 1: Check that SimpleWritable. */ public void testSimpleWritable() throws Exception { testWritable(new SimpleWritable()); @@ -121,9 +146,34 @@ public class TestWritable extends TestCase { @Override public int compareTo(Frob o) { return 0; } } - /** Test that comparator is defined. */ + /** Test that comparator is defined and configured. */ public static void testGetComparator() throws Exception { - assert(WritableComparator.get(Frob.class) instanceof FrobComparator); + Configuration conf = new Configuration(); + + // Without conf. + WritableComparator frobComparator = WritableComparator.get(Frob.class); + assert(frobComparator instanceof FrobComparator); + assertNotNull(frobComparator.getConf()); + assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM)); + + // With conf. + conf.set(TEST_CONFIG_PARAM, TEST_CONFIG_VALUE); + frobComparator = WritableComparator.get(Frob.class, conf); + assert(frobComparator instanceof FrobComparator); + assertNotNull(frobComparator.getConf()); + assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE); + + // Without conf. should reuse configuration. + frobComparator = WritableComparator.get(Frob.class); + assert(frobComparator instanceof FrobComparator); + assertNotNull(frobComparator.getConf()); + assertEquals(conf.get(TEST_CONFIG_PARAM), TEST_CONFIG_VALUE); + + // New conf. should use new configuration. + frobComparator = WritableComparator.get(Frob.class, new Configuration()); + assert(frobComparator instanceof FrobComparator); + assertNotNull(frobComparator.getConf()); + assertNull(frobComparator.getConf().get(TEST_CONFIG_PARAM)); } /** @@ -153,4 +203,17 @@ public class TestWritable extends TestCase { .compare(writable1, writable3) == 0); } + /** + * Test that Writable's are configured by Comparator. + */ + public void testConfigurableWritableComparator() throws Exception { + Configuration conf = new Configuration(); + conf.set(TEST_WRITABLE_CONFIG_PARAM, TEST_WRITABLE_CONFIG_VALUE); + + WritableComparator wc = WritableComparator.get(SimpleWritableComparable.class, conf); + SimpleWritableComparable key = ((SimpleWritableComparable)wc.newKey()); + assertNotNull(wc.getConf()); + assertNotNull(key.getConf()); + assertEquals(key.getConf().get(TEST_WRITABLE_CONFIG_PARAM), TEST_WRITABLE_CONFIG_VALUE); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java index 17ae55e373e..861c47bbb4c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java @@ -112,7 +112,7 @@ import org.apache.log4j.Level; @InterfaceAudience.Public @InterfaceStability.Stable public class JobConf extends Configuration { - + private static final Log LOG = LogFactory.getLog(JobConf.class); static{ @@ -882,7 +882,7 @@ public class JobConf extends Configuration { JobContext.KEY_COMPARATOR, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); - return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); + return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java index 8bb5fcd2efd..0684268d2d7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java @@ -131,7 +131,7 @@ public abstract class CompositeRecordReader< public void add(ComposableRecordReader rr) throws IOException { kids[rr.id()] = rr; if (null == q) { - cmp = WritableComparator.get(rr.createKey().getClass()); + cmp = WritableComparator.get(rr.createKey().getClass(), conf); q = new PriorityQueue>(3, new Comparator>() { public int compare(ComposableRecordReader o1, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java index 5b38ba2c20e..a86f32ead9f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; @@ -38,7 +40,7 @@ import org.apache.hadoop.mapred.RecordReader; @InterfaceStability.Stable public class WrappedRecordReader - implements ComposableRecordReader { + implements ComposableRecordReader, Configurable { private boolean empty = false; private RecordReader rr; @@ -47,6 +49,7 @@ public class WrappedRecordReader vjoin; @@ -55,13 +58,20 @@ public class WrappedRecordReader rr, Class cmpcl) throws IOException { + this(id, rr, cmpcl, null); + } + + WrappedRecordReader(int id, RecordReader rr, + Class cmpcl, + Configuration conf) throws IOException { this.id = id; this.rr = rr; + this.conf = (conf == null) ? new Configuration() : conf; khead = rr.createKey(); vhead = rr.createValue(); try { cmp = (null == cmpcl) - ? WritableComparator.get(khead.getClass()) + ? WritableComparator.get(khead.getClass(), this.conf) : cmpcl.newInstance(); } catch (InstantiationException e) { throw (IOException)new IOException().initCause(e); @@ -207,4 +217,13 @@ public class WrappedRecordReader>(3, new Comparator>() { public int compare(ComposableRecordReader o1, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java index d3521d4389e..28310d017af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java @@ -92,7 +92,7 @@ public class WrappedRecordReader, keyclass = key.getClass().asSubclass(WritableComparable.class); valueclass = value.getClass(); if (cmp == null) { - cmp = WritableComparator.get(keyclass); + cmp = WritableComparator.get(keyclass, conf); } } } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 33c3ed73323..81d0a20e2ee 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -224,6 +224,9 @@ Release 2.5.0 - UNRELEASED YARN-2075. Fixed the test failure of TestRMAdminCLI. (Kenji Kikushima via zjshen) + YARN-2155. FairScheduler: Incorrect threshold check for preemption. + (Wei Yan via kasha) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5725f8c383b..ea53165c632 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1072,8 +1072,8 @@ public class FairScheduler extends private boolean shouldAttemptPreemption() { if (preemptionEnabled) { return (preemptionUtilizationThreshold < Math.max( - (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(), - (float) rootMetrics.getAvailableVirtualCores() / + (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), + (float) rootMetrics.getAllocatedVirtualCores() / clusterResource.getVirtualCores())); } return false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 2098e1679b9..310104bd084 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -146,7 +146,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // Create node with 4GB memory and 4 vcores registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024); - // Verify submitting another request doesn't trigger preemption + // Verify submitting another request triggers preemption createSchedulingRequest(1024, "queueB", "user1", 1, 1); scheduler.update(); clock.tick(6); @@ -171,5 +171,21 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { scheduler.preemptTasksIfNecessary(); assertEquals("preemptResources() should not have been called", -1, ((StubbedFairScheduler) scheduler).lastPreemptMemory); + + resourceManager.stop(); + + startResourceManager(0.7f); + // Create node with 4GB memory and 4 vcores + registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024); + + // Verify submitting another request triggers preemption + createSchedulingRequest(1024, "queueB", "user1", 1, 1); + scheduler.update(); + clock.tick(6); + + ((StubbedFairScheduler) scheduler).resetLastPreemptResources(); + scheduler.preemptTasksIfNecessary(); + assertEquals("preemptResources() should have been called", 1024, + ((StubbedFairScheduler) scheduler).lastPreemptMemory); } }