diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 8d99d0f8f4d..6f8396ed190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Deque; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -42,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -68,7 +70,6 @@ import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -102,7 +103,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private HBaseAdmin hbAdmin; public static final String NAME = "completebulkload"; + public static final String MAX_FILES_PER_REGION_PER_FAMILY + = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; + + private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; private boolean hasForwardedToken; @@ -119,6 +124,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { this.hbAdmin = new HBaseAdmin(conf); this.userProvider = UserProvider.instantiate(conf); assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); + maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); } private void usage() { @@ -291,6 +297,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { Multimap regionGroups = groupOrSplitPhase(table, pool, queue, startEndKeys); + if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { + // Error is logged inside checkHFilesCountPerRegionPerFamily. + throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + + " hfiles to one family of one region"); + } + bulkLoadPhase(table, conn, pool, queue, regionGroups); // NOTE: The next iteration's split / group could happen in parallel to @@ -378,6 +390,31 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + private boolean checkHFilesCountPerRegionPerFamily( + final Multimap regionGroups) { + for (Entry> e: regionGroups.asMap().entrySet()) { + final Collection lqis = e.getValue(); + HashMap filesMap = new HashMap(); + for (LoadQueueItem lqi: lqis) { + MutableInt count = filesMap.get(lqi.family); + if (count == null) { + count = new MutableInt(); + filesMap.put(lqi.family, count); + } + count.increment(); + if (count.intValue() > maxFilesPerRegionPerFamily) { + LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + + " hfiles to family " + Bytes.toStringBinary(lqi.family) + + " of region with start key " + + Bytes.toStringBinary(e.getKey())); + return false; + } + } + } + return true; + } + /** * @return A Multimap that groups LQI by likely * bulk load region targets. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 37b9f3db77b..860fcdad56a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.TreeMap; @@ -60,7 +61,8 @@ import org.junit.experimental.categories.Category; public class TestLoadIncrementalHFiles { private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); private static final byte[] FAMILY = Bytes.toBytes("myfam"); - private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; + static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; + static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; private static final byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("ddd"), @@ -75,6 +77,9 @@ public class TestLoadIncrementalHFiles { @BeforeClass public static void setUpBeforeClass() throws Exception { + util.getConfiguration().setInt( + LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); util.startMiniCluster(); } @@ -355,6 +360,29 @@ public class TestLoadIncrementalHFiles { } } + @Test + public void testLoadTooMayHFiles() throws Exception { + Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); + FileSystem fs = util.getTestFileSystem(); + dir = dir.makeQualified(fs); + Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + byte[] from = Bytes.toBytes("begin"); + byte[] to = Bytes.toBytes("end"); + for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { + createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + + i), FAMILY, QUALIFIER, from, to, 1000); + } + + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); + String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"}; + try { + loader.run(args); + fail("Bulk loading too many files should fail"); + } catch (IOException ie) { + assertTrue(ie.getMessage().contains("Trying to load more than " + + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java index 1452fac9fcb..18df7f69abd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java @@ -48,6 +48,9 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{ HadoopSecurityEnabledUserProviderForTesting.class); // setup configuration SecureTestUtil.enableSecurity(util.getConfiguration()); + util.getConfiguration().setInt( + LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, + MAX_FILES_PER_REGION_PER_FAMILY); util.startMiniCluster();