HBASE-27305 add an option to skip file splitting when bulkload hfiles (#4709)
Co-authored-by: huiruan <huiruan@tencent.com> Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
f238a92b6d
commit
00a719e76f
|
@ -124,6 +124,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
*/
|
*/
|
||||||
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
|
public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
|
||||||
|
|
||||||
|
public static final String FAIL_IF_NEED_SPLIT_HFILE =
|
||||||
|
"hbase.loadincremental.fail.if.need.split.hfile";
|
||||||
|
|
||||||
// We use a '.' prefix which is ignored when walking directory trees
|
// We use a '.' prefix which is ignored when walking directory trees
|
||||||
// above. It is invalid family name.
|
// above. It is invalid family name.
|
||||||
static final String TMP_DIR = ".tmp";
|
static final String TMP_DIR = ".tmp";
|
||||||
|
@ -141,6 +144,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
|
|
||||||
private List<String> clusterIds = new ArrayList<>();
|
private List<String> clusterIds = new ArrayList<>();
|
||||||
private boolean replicate = true;
|
private boolean replicate = true;
|
||||||
|
private boolean failIfNeedSplitHFile = false;
|
||||||
|
|
||||||
public BulkLoadHFilesTool(Configuration conf) {
|
public BulkLoadHFilesTool(Configuration conf) {
|
||||||
// make a copy, just to be sure we're not overriding someone else's config
|
// make a copy, just to be sure we're not overriding someone else's config
|
||||||
|
@ -159,6 +163,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
nrThreads =
|
nrThreads =
|
||||||
conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
|
conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
|
||||||
bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
|
bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
|
||||||
|
failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize a thread pool
|
// Initialize a thread pool
|
||||||
|
@ -699,6 +704,11 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
||||||
Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
|
Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
|
||||||
.equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
|
.equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
|
||||||
if (!lastKeyInRange) {
|
if (!lastKeyInRange) {
|
||||||
|
if (failIfNeedSplitHFile) {
|
||||||
|
throw new IOException(
|
||||||
|
"The key range of hfile=" + hfilePath + " fits into no region. " + "And because "
|
||||||
|
+ FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps.");
|
||||||
|
}
|
||||||
int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
|
int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
|
||||||
int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
|
int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
|
||||||
// make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
|
// make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.tool;
|
||||||
import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
|
import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -782,4 +783,29 @@ public class TestBulkLoadHFiles {
|
||||||
util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
|
util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailIfNeedSplitHFile() throws IOException {
|
||||||
|
TableName tableName = TableName.valueOf(tn.getMethodName());
|
||||||
|
Table table = util.createTable(tableName, FAMILY);
|
||||||
|
|
||||||
|
util.loadTable(table, FAMILY);
|
||||||
|
|
||||||
|
FileSystem fs = util.getTestFileSystem();
|
||||||
|
Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
|
||||||
|
HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
|
||||||
|
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
|
||||||
|
|
||||||
|
util.getAdmin().split(tableName);
|
||||||
|
util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
|
||||||
|
|
||||||
|
Configuration config = new Configuration(util.getConfiguration());
|
||||||
|
config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
|
||||||
|
BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
|
||||||
|
|
||||||
|
String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
|
||||||
|
assertThrows(IOException.class, () -> tool.run(args));
|
||||||
|
util.getHBaseCluster().getRegions(tableName)
|
||||||
|
.forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue