From bdbd10fde1539920de937404a785e6ed34dd5628 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 8 May 2015 10:18:29 -0700 Subject: [PATCH] MAPREDUCE-2632. Avoid calling the partitioner when the numReduceTasks is 1. (Ravi Teja Ch N V and Sunil G via kasha) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/MapFileOutputFormat.java | 8 ++- .../org/apache/hadoop/mapred/Partitioner.java | 3 + .../apache/hadoop/mapreduce/Partitioner.java | 9 ++- .../lib/output/MapFileOutputFormat.java | 8 ++- .../mapred/TestMapFileOutputFormat.java | 70 +++++++++++++++++++ .../lib/output/TestMapFileOutputFormat.java | 65 +++++++++++++++++ 7 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapFileOutputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMapFileOutputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c97a7f62455..3df8fd52664 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -109,6 +109,9 @@ Trunk (Unreleased) MAPREDUCE-6057. Remove obsolete entries from mapred-default.xml (Ray Chiang via aw) + MAPREDUCE-2632. Avoid calling the partitioner when the numReduceTasks is 1. + (Ravi Teja Ch N V and Sunil G via kasha) + BUG FIXES MAPREDUCE-6191. Improve clearing stale state of Java serialization diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java index 374a6c79ae9..bc746c56010 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapFileOutputFormat.java @@ -95,7 +95,13 @@ Writable getEntry(MapFile.Reader[] readers, Partitioner partitioner, K key, V value) throws IOException { - int part = partitioner.getPartition(key, value, readers.length); + int readerLength = readers.length; + int part; + if (readerLength <= 1) { + part = 0; + } else { + part = partitioner.getPartition(key, value, readers.length); + } return readers[part].get(key, value); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Partitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Partitioner.java index 1aa0ab1f92c..2e28f5db192 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Partitioner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Partitioner.java @@ -30,6 +30,9 @@ * is the same as the number of reduce tasks for the job. Hence this controls * which of the m reduce tasks the intermediate key (and hence the * record) is sent for reduction.

+ * + *

Note: A Partitioner is created only when there are multiple + * reducers.

* * @see Reducer */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Partitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Partitioner.java index 3b846c23327..7fdb83dc3e7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Partitioner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Partitioner.java @@ -31,9 +31,12 @@ * is the same as the number of reduce tasks for the job. Hence this controls * which of the m reduce tasks the intermediate key (and hence the * record) is sent for reduction.

- * - * Note: If you require your Partitioner class to obtain the Job's configuration - * object, implement the {@link Configurable} interface. + * + *

Note: A Partitioner is created only when there are multiple + * reducers.

+ * + *

Note: If you require your Partitioner class to obtain the Job's + * configuration object, implement the {@link Configurable} interface.

* * @see Reducer */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java index da33770b4ba..0724c85330e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/MapFileOutputFormat.java @@ -114,7 +114,13 @@ public boolean accept(Path path) { public static , V extends Writable> Writable getEntry(MapFile.Reader[] readers, Partitioner partitioner, K key, V value) throws IOException { - int part = partitioner.getPartition(key, value, readers.length); + int readerLength = readers.length; + int part; + if (readerLength <= 1) { + part = 0; + } else { + part = partitioner.getPartition(key, value, readers.length); + } return readers[part].get(key, value); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapFileOutputFormat.java new file mode 100644 index 00000000000..7e315ae7a31 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMapFileOutputFormat.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.MapFile.Reader; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapFileOutputFormat { + + @SuppressWarnings("static-access") + @Test + public void testPartitionerShouldNotBeCalledWhenOneReducerIsPresent() + throws Exception { + MapFileOutputFormat outputFormat = new MapFileOutputFormat(); + Reader reader = Mockito.mock(Reader.class); + Reader[] readers = new Reader[]{reader}; + outputFormat.getEntry(readers, new MyPartitioner(), new Text(), new Text()); + assertTrue(!MyPartitioner.isGetPartitionCalled()); + } + + protected void tearDown() throws Exception { + MyPartitioner.setGetPartitionCalled(false); + }; + private static class MyPartitioner + implements + Partitioner { + private static boolean getPartitionCalled = false; + + @Override + public int getPartition(WritableComparable key, Writable value, + int numPartitions) { + setGetPartitionCalled(true); + return -1; + } + + public static boolean isGetPartitionCalled() { + return getPartitionCalled; + } + + @Override + public void configure(JobConf job) { + } + + public static void setGetPartitionCalled(boolean getPartitionCalled) { + MyPartitioner.getPartitionCalled = getPartitionCalled; + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMapFileOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMapFileOutputFormat.java new file mode 100644 index 00000000000..82758f1557b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestMapFileOutputFormat.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.MapFile.Reader; +import org.apache.hadoop.mapreduce.Partitioner; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestMapFileOutputFormat { + + @SuppressWarnings("static-access") + @Test + public void testPartitionerShouldNotBeCalledWhenOneReducerIsPresent() + throws Exception { + MapFileOutputFormat outputFormat = new MapFileOutputFormat(); + Reader reader = Mockito.mock(Reader.class); + Reader[] readers = new Reader[]{reader}; + outputFormat.getEntry(readers, new MyPartitioner(), new Text(), new Text()); + assertTrue(!MyPartitioner.isGetPartitionCalled()); + } + + public void tearDown() throws Exception { + MyPartitioner.setGetPartitionCalled(false); + } + private static class MyPartitioner + extends + Partitioner { + private static boolean getPartitionCalled = false; + + public static boolean isGetPartitionCalled() { + return getPartitionCalled; + } + @Override + public int getPartition(WritableComparable key, Writable value, + int numPartitions) { + setGetPartitionCalled(true); + return -1; + } + public static void setGetPartitionCalled(boolean getPartitionCalled) { + MyPartitioner.getPartitionCalled = getPartitionCalled; + } + } +} \ No newline at end of file