From cc617140bf9220c1dea470a6a90c993da4c4c8e3 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 11 Mar 2021 20:41:26 -0800 Subject: [PATCH] HBASE-25566 RoundRobinTableInputFormat (#2947) Co-authored-by: stack Co-authored-by: sudhir-reddy Co-authored-by: Huaxiang Sun --- .../mapreduce/RoundRobinTableInputFormat.java | 173 +++++++++++++++++ .../hbase/mapreduce/TableMapReduceUtil.java | 16 +- .../hadoop/hbase/mapreduce/TableSplit.java | 14 +- .../TestRoundRobinTableInputFormat.java | 177 ++++++++++++++++++ 4 files changed, 370 insertions(+), 10 deletions(-) create mode 100644 hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java create mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java new file mode 100644 index 00000000000..2b15e004b9a --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RoundRobinTableInputFormat.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Process the return from super-class {@link TableInputFormat} (TIF) so as to undo any clumping of + * {@link InputSplit}s around RegionServers. Spread splits broadly to distribute read-load over + * RegionServers in the cluster. The super-class TIF returns splits in hbase:meta table order. + * Adjacent or near-adjacent hbase:meta Regions can be hosted on the same RegionServer -- nothing + * prevents this. This hbase:maeta ordering of InputSplit placement can be lumpy making it so some + * RegionServers end up hosting lots of InputSplit scans while contemporaneously other RegionServers + * host few or none. This class does a pass over the return from the super-class to better spread + * the load. See the below helpful Flipkart blog post for a description and from where the base of + * this code comes from (with permission). + * @see https://tech.flipkart.com/is-data-locality-always-out-of-the-box-in-hadoop-not-really-2ae9c95163cb + */ +@InterfaceAudience.Public +public class RoundRobinTableInputFormat extends TableInputFormat { + private Boolean hbaseRegionsizecalculatorEnableOriginalValue = null; + /** + * Boolean config for whether superclass should produce InputSplits with 'lengths'. If true, TIF + * will query every RegionServer to get the 'size' of all involved Regions and this 'size' will + * be used the the InputSplit length. If false, we skip this query and the super-classes + * returned InputSplits will have lenghths of zero. This override will set the flag to false. + * All returned lengths will be zero. Makes it so sorting on 'length' becomes a noop. The sort + * returned by this override will prevail. Thats what we want. + */ + static String HBASE_REGIONSIZECALCULATOR_ENABLE = "hbase.regionsizecalculator.enable"; + + @Override + public List getSplits(JobContext context) throws IOException { + try { + // Do a round robin on what we get back from the super-class. + configure(); + return roundRobin(getSuperSplits(context)); + } finally { + unconfigure(); + } + } + + /** + * Call super-classes' getSplits. Have it out here as its own method so can be overridden. + */ + List getSuperSplits(JobContext context) throws IOException { + return super.getSplits(context); + } + + /** + * Spread the splits list so as to avoid clumping on RegionServers. Order splits so every server + * gets one split before a server gets a second, and so on; i.e. round-robin the splits amongst + * the servers in the cluster. + */ + List roundRobin(List inputs) throws IOException { + if ((inputs == null) || inputs.isEmpty()) { + return inputs; + } + List result = new ArrayList<>(inputs.size()); + // Prepare a hashmap with each region server as key and list of Input Splits as value + Map> regionServerSplits = new HashMap<>(); + for (InputSplit is: inputs) { + if (is instanceof TableSplit) { + String regionServer = ((TableSplit)is).getRegionLocation(); + if (regionServer != null && !regionServer.isEmpty()) { + regionServerSplits.computeIfAbsent(regionServer, k -> new LinkedList<>()).add(is); + continue; + } + } + // If TableSplit or region server not found, add it anyways. + result.add(is); + } + // Write out splits in a manner that spreads splits for a RegionServer to avoid 'clumping'. + while (!regionServerSplits.isEmpty()) { + Iterator iterator = regionServerSplits.keySet().iterator(); + while (iterator.hasNext()) { + String regionServer = iterator.next(); + List inputSplitListForRegion = regionServerSplits.get(regionServer); + if (!inputSplitListForRegion.isEmpty()) { + result.add(inputSplitListForRegion.remove(0)); + } + if (inputSplitListForRegion.isEmpty()) { + iterator.remove(); + } + } + } + return result; + } + + /** + * Adds a configuration to the Context disabling remote rpc'ing to figure Region size + * when calculating InputSplits. See up in super-class TIF where we rpc to every server to find + * the size of all involved Regions. Here we disable this super-class action. This means + * InputSplits will have a length of zero. If all InputSplits have zero-length InputSplits, the + * ordering done in here will 'pass-through' Hadoop's length-first sort. The superclass TIF will + * ask every node for the current size of each of the participating Table Regions. It does this + * because it wants to schedule the biggest Regions first (This fixation comes of hadoop itself + * -- see JobSubmitter where it sorts inputs by size). This extra diligence takes time and is of + * no utility in this RRTIF where spread is of more import than size-first. Also, if a rolling + * restart is happening when we go to launch the job, the job launch may fail because the request + * for Region size fails -- even after retries -- because rolled RegionServer may take a while to + * come online: e.g. it takes java 90 seconds to allocate a 160G. RegionServer is offline during + * this time. The job launch will fail with 'Connection rejected'. So, we set + * 'hbase.regionsizecalculator.enable' to false here in RRTIF. + * @see #unconfigure() + */ + void configure() { + if (getConf().get(HBASE_REGIONSIZECALCULATOR_ENABLE) != null) { + this.hbaseRegionsizecalculatorEnableOriginalValue = getConf(). + getBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, true); + } + getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, false); + } + + /** + * @see #configure() + */ + void unconfigure() { + if (this.hbaseRegionsizecalculatorEnableOriginalValue == null) { + getConf().unset(HBASE_REGIONSIZECALCULATOR_ENABLE); + } else { + getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, + this.hbaseRegionsizecalculatorEnableOriginalValue); + } + } + + /** + * Pass table name as argument. Set the zk ensemble to use with the System property + * 'hbase.zookeeper.quorum' + */ + public static void main(String[] args) throws IOException { + TableInputFormat tif = new RoundRobinTableInputFormat(); + final Configuration configuration = HBaseConfiguration.create(); + configuration.setBoolean("hbase.regionsizecalculator.enable", false); + configuration.set(HConstants.ZOOKEEPER_QUORUM, + System.getProperty(HConstants.ZOOKEEPER_QUORUM, "localhost")); + configuration.set(TableInputFormat.INPUT_TABLE, args[0]); + tif.setConf(configuration); + List splits = tif.getSplits(new JobContextImpl(configuration, new JobID())); + for (InputSplit split: splits) { + System.out.println(split); + } + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index 1425e9b7425..5a6071e0186 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -71,6 +71,7 @@ import com.codahale.metrics.MetricRegistry; @InterfaceAudience.Public public class TableMapReduceUtil { private static final Logger LOG = LoggerFactory.getLogger(TableMapReduceUtil.class); + public static final String TABLE_INPUT_CLASS_KEY = "hbase.table.input.class"; /** * Use this before submitting a TableMap job. It will appropriately set up @@ -264,8 +265,17 @@ public class TableMapReduceUtil { Class outputValueClass, Job job, boolean addDependencyJars) throws IOException { - initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, TableInputFormat.class); + initTableMapperJob(Bytes.toString(table), scan, mapper, outputKeyClass, outputValueClass, job, + addDependencyJars, getConfiguredInputFormat(job)); + } + + /** + * @return {@link TableInputFormat} .class unless Configuration has something else at + * {@link #TABLE_INPUT_CLASS_KEY}. + */ + private static Class getConfiguredInputFormat(Job job) { + return (Class)job.getConfiguration(). + getClass(TABLE_INPUT_CLASS_KEY, TableInputFormat.class); } /** @@ -290,7 +300,7 @@ public class TableMapReduceUtil { boolean addDependencyJars) throws IOException { initTableMapperJob(table, scan, mapper, outputKeyClass, - outputValueClass, job, addDependencyJars, TableInputFormat.class); + outputValueClass, job, addDependencyJars, getConfiguredInputFormat(job)); } /** diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java index acce55e82ce..93300ebb0f3 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java @@ -352,8 +352,8 @@ public class TableSplit extends InputSplit @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("HBase table split("); - sb.append("table name: ").append(tableName); + sb.append("Split("); + sb.append("tablename=").append(tableName); // null scan input is represented by "" String printScan = ""; if (!scan.equals("")) { @@ -364,12 +364,12 @@ public class TableSplit extends InputSplit catch (IOException e) { printScan = ""; } + sb.append(", scan=").append(printScan); } - sb.append(", scan: ").append(printScan); - sb.append(", start row: ").append(Bytes.toStringBinary(startRow)); - sb.append(", end row: ").append(Bytes.toStringBinary(endRow)); - sb.append(", region location: ").append(regionLocation); - sb.append(", encoded region name: ").append(encodedRegionName); + sb.append(", startrow=").append(Bytes.toStringBinary(startRow)); + sb.append(", endrow=").append(Bytes.toStringBinary(endRow)); + sb.append(", regionLocation=").append(regionLocation); + sb.append(", regionname=").append(encodedRegionName); sb.append(")"); return sb.toString(); } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java new file mode 100644 index 00000000000..c3abf4d544e --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestRoundRobinTableInputFormat.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +/** + * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF. + */ +@Category({SmallTests.class}) +public class TestRoundRobinTableInputFormat { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRoundRobinTableInputFormat.class); + + private static final int SERVERS_COUNT = 5; + private static final String[] KEYS = { + "aa", "ab", "ac", "ad", "ae", + "ba", "bb", "bc", "bd", "be", + "ca", "cb", "cc", "cd", "ce", + "da", "db", "dc", "dd", "de", + "ea", "eb", "ec", "ed", "ee", + "fa", "fb", "fc", "fd", "fe", + "ga", "gb", "gc", "gd", "ge", + "ha", "hb", "hc", "hd", "he", + "ia", "ib", "ic", "id", "ie", + "ja", "jb", "jc", "jd", "je", "jf" + }; + + /** + * Test default behavior. + */ + @Test + public void testRoundRobinSplit() throws IOException, InterruptedException { + final List splits = createSplits(); + Collections.shuffle(splits); + List sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits); + testDistribution(sortedSplits); + // Now test that order is preserved even after being passed through the SplitComparator + // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter. + List copy = new ArrayList<>(sortedSplits); + Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator()); + // Assert the sort is retained even after passing through SplitComparator. + for (int i = 0; i < sortedSplits.size(); i++) { + TableSplit sortedTs = (TableSplit)sortedSplits.get(i); + TableSplit copyTs = (TableSplit)copy.get(i); + assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName()); + } + } + + /** + * @return Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO! + */ + private List createSplits() { + List splits = new ArrayList<>(KEYS.length - 1); + for (int i = 0; i < KEYS.length - 1; i++) { + InputSplit split = new TableSplit(TableName.valueOf("test"), new Scan(), + Bytes.toBytes(KEYS[i]), Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1), + "", 0); + splits.add(split); + } + return splits; + } + + private void testDistribution(List list) throws IOException, InterruptedException { + for (int i = 0; i < KEYS.length/SERVERS_COUNT; i++) { + int [] counts = new int[SERVERS_COUNT]; + for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) { + counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++; + } + for (int value : counts) { + assertEquals(value, 1); + } + } + } + + /** + * Private comparator copied from private JobSubmmiter Hadoop class... + * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java + * Used so we can run the sort done up in JobSubmitter here in tests. + */ + private static class SplitComparator implements Comparator { + @Override + public int compare(InputSplit o1, InputSplit o2) { + try { + return Long.compare(o1.getLength(), o2.getLength()); + } catch (IOException|InterruptedException e) { + throw new RuntimeException("exception in compare", e); + } + } + } + + /** + * Assert that lengths are descending. RRTIF writes lengths in descending order so any + * subsequent sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps + * our RRTIF ordering. + */ + private void assertLengthDescending(List list) + throws IOException, InterruptedException { + long previousLength = Long.MAX_VALUE; + for (InputSplit is: list) { + long length = is.getLength(); + assertTrue(previousLength + " " + length, previousLength > length); + previousLength = length; + } + } + + /** + * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE + * configuration. + */ + @Test + public void testConfigureUnconfigure() { + Configuration configuration = HBaseConfiguration.create(); + RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat(); + rrtif.setConf(configuration); + JobContext jobContext = Mockito.mock(JobContext.class); + Mockito.when(jobContext.getConfiguration()).thenReturn(configuration); + // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset. + configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); + rrtif.configure(); + rrtif.unconfigure(); + String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); + assertNull(value); + // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done. + checkRetainsBooleanValue(jobContext, rrtif, false); + // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done. + checkRetainsBooleanValue(jobContext, rrtif, true); + } + + private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif, + final boolean b) { + jobContext.getConfiguration(). + setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b); + rrtif.configure(); + rrtif.unconfigure(); + String value = jobContext.getConfiguration(). + get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); + assertEquals(b, Boolean.valueOf(value)); + } +}