HBASE-7849 Provide administrative limits around bulkloads of files into a single region
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1567736 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e6fa0cf92d
commit
34948b947b
|
@ -26,6 +26,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -42,6 +43,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.mutable.MutableInt;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -68,7 +70,6 @@ import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
|
||||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||||
import org.apache.hadoop.hbase.io.Reference;
|
import org.apache.hadoop.hbase.io.Reference;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
@ -102,7 +103,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
private HBaseAdmin hbAdmin;
|
private HBaseAdmin hbAdmin;
|
||||||
|
|
||||||
public static final String NAME = "completebulkload";
|
public static final String NAME = "completebulkload";
|
||||||
|
public static final String MAX_FILES_PER_REGION_PER_FAMILY
|
||||||
|
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
|
||||||
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
|
||||||
|
|
||||||
|
private int maxFilesPerRegionPerFamily;
|
||||||
private boolean assignSeqIds;
|
private boolean assignSeqIds;
|
||||||
|
|
||||||
private boolean hasForwardedToken;
|
private boolean hasForwardedToken;
|
||||||
|
@ -119,6 +124,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
this.hbAdmin = new HBaseAdmin(conf);
|
this.hbAdmin = new HBaseAdmin(conf);
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
this.userProvider = UserProvider.instantiate(conf);
|
||||||
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
|
||||||
|
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void usage() {
|
private void usage() {
|
||||||
|
@ -291,6 +297,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
|
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
|
||||||
pool, queue, startEndKeys);
|
pool, queue, startEndKeys);
|
||||||
|
|
||||||
|
if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
|
||||||
|
// Error is logged inside checkHFilesCountPerRegionPerFamily.
|
||||||
|
throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
|
||||||
|
+ " hfiles to one family of one region");
|
||||||
|
}
|
||||||
|
|
||||||
bulkLoadPhase(table, conn, pool, queue, regionGroups);
|
bulkLoadPhase(table, conn, pool, queue, regionGroups);
|
||||||
|
|
||||||
// NOTE: The next iteration's split / group could happen in parallel to
|
// NOTE: The next iteration's split / group could happen in parallel to
|
||||||
|
@ -378,6 +390,31 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean checkHFilesCountPerRegionPerFamily(
|
||||||
|
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
|
||||||
|
for (Entry<ByteBuffer,
|
||||||
|
? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
|
||||||
|
final Collection<LoadQueueItem> lqis = e.getValue();
|
||||||
|
HashMap<byte[], MutableInt> filesMap = new HashMap<byte[], MutableInt>();
|
||||||
|
for (LoadQueueItem lqi: lqis) {
|
||||||
|
MutableInt count = filesMap.get(lqi.family);
|
||||||
|
if (count == null) {
|
||||||
|
count = new MutableInt();
|
||||||
|
filesMap.put(lqi.family, count);
|
||||||
|
}
|
||||||
|
count.increment();
|
||||||
|
if (count.intValue() > maxFilesPerRegionPerFamily) {
|
||||||
|
LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
|
||||||
|
+ " hfiles to family " + Bytes.toStringBinary(lqi.family)
|
||||||
|
+ " of region with start key "
|
||||||
|
+ Bytes.toStringBinary(e.getKey()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
|
* @return A Multimap<startkey, LoadQueueItem> that groups LQI by likely
|
||||||
* bulk load region targets.
|
* bulk load region targets.
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapreduce;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -60,7 +61,8 @@ import org.junit.experimental.categories.Category;
|
||||||
public class TestLoadIncrementalHFiles {
|
public class TestLoadIncrementalHFiles {
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
|
||||||
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
private static final byte[] FAMILY = Bytes.toBytes("myfam");
|
||||||
private static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
|
static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
|
||||||
|
static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
|
||||||
|
|
||||||
private static final byte[][] SPLIT_KEYS = new byte[][] {
|
private static final byte[][] SPLIT_KEYS = new byte[][] {
|
||||||
Bytes.toBytes("ddd"),
|
Bytes.toBytes("ddd"),
|
||||||
|
@ -75,6 +77,9 @@ public class TestLoadIncrementalHFiles {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
util.getConfiguration().setInt(
|
||||||
|
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
||||||
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,6 +360,29 @@ public class TestLoadIncrementalHFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLoadTooMayHFiles() throws Exception {
|
||||||
|
Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
|
||||||
|
FileSystem fs = util.getTestFileSystem();
|
||||||
|
dir = dir.makeQualified(fs);
|
||||||
|
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
|
||||||
|
|
||||||
|
byte[] from = Bytes.toBytes("begin");
|
||||||
|
byte[] to = Bytes.toBytes("end");
|
||||||
|
for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
|
||||||
|
createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
|
||||||
|
+ i), FAMILY, QUALIFIER, from, to, 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
|
||||||
|
String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
|
||||||
|
try {
|
||||||
|
loader.run(args);
|
||||||
|
fail("Bulk loading too many files should fail");
|
||||||
|
} catch (IOException ie) {
|
||||||
|
assertTrue(ie.getMessage().contains("Trying to load more than "
|
||||||
|
+ MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,9 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles{
|
||||||
HadoopSecurityEnabledUserProviderForTesting.class);
|
HadoopSecurityEnabledUserProviderForTesting.class);
|
||||||
// setup configuration
|
// setup configuration
|
||||||
SecureTestUtil.enableSecurity(util.getConfiguration());
|
SecureTestUtil.enableSecurity(util.getConfiguration());
|
||||||
|
util.getConfiguration().setInt(
|
||||||
|
LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
|
||||||
|
MAX_FILES_PER_REGION_PER_FAMILY);
|
||||||
|
|
||||||
util.startMiniCluster();
|
util.startMiniCluster();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue