diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 45ae65fcb89..cb6503fed0a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -16,6 +16,9 @@ Trunk (Unreleased) MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu) + MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions + with poor implementations of Object#hashCode(). (Radim Kolar via cutting) + IMPROVEMENTS MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/RehashPartitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/RehashPartitioner.java new file mode 100644 index 00000000000..ffc3938a815 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/RehashPartitioner.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.partition; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * This partitioner rehashes values returned by {@link Object#hashCode()} + * to get smoother distribution between partitions which may improve + * reduce reduce time in some cases and should harm things in no cases. + * This partitioner is suggested with Integer and Long keys with simple + * patterns in their distributions. + * @since 2.0.3 + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class RehashPartitioner extends Partitioner { + + /** prime number seed for increasing hash quality */ + private static final int SEED = 1591267453; + + /** Rehash {@link Object#hashCode()} to partition. */ + public int getPartition(K key, V value, int numReduceTasks) { + int h = SEED ^ key.hashCode(); + h ^= (h >>> 20) ^ (h >>> 12); + h = h ^ (h >>> 7) ^ (h >>> 4); + + return (h & Integer.MAX_VALUE) % numReduceTasks; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestRehashPartitioner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestRehashPartitioner.java new file mode 100644 index 00000000000..d2048c1686c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestRehashPartitioner.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.partition; + +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.Collections; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; + +import org.junit.*; + +public class TestRehashPartitioner { + + /** number of partitions */ + private static final int PARTITIONS = 32; + + /** step in sequence */ + private static final int STEP = 3; + + /** end of test sequence */ + private static final int END = 100000; + + /** maximum error for considering too big/small bucket */ + private static final double MAX_ERROR = 0.20; + + /** maximum number of oddly sized buckets */ + private static final double MAX_BADBUCKETS = 0.10; + + /** test partitioner for patterns */ + @Test + public void testPatterns() { + int results[] = new int[PARTITIONS]; + RehashPartitioner p = new RehashPartitioner < IntWritable, NullWritable> (); + /* test sequence 4, 8, 12, ... 128 */ + for(int i = 0; i < END; i+= STEP) { + results[p.getPartition(new IntWritable(i), null, PARTITIONS)]++; + } + int badbuckets = 0; + Integer min = Collections.min(Arrays.asList(ArrayUtils.toObject(results))); + Integer max = Collections.max(Arrays.asList(ArrayUtils.toObject(results))); + Integer avg = (int) Math.round((max+min)/2.0); + System.out.println("Dumping buckets distribution: min="+min+" avg="+avg+" max="+max); + for (int i = 0; i < PARTITIONS; i++) { + double var = (results[i]-avg)/(double)(avg); + System.out.println("bucket "+i+" "+results[i]+" items, variance "+var); + if (Math.abs(var) > MAX_ERROR) + badbuckets++; + } + System.out.println(badbuckets + " of "+PARTITIONS+" are too small or large buckets"); + assertTrue("too many overflow buckets", badbuckets < PARTITIONS * MAX_BADBUCKETS); + } +}