HBASE-15808 Reduce potential bulk load intermediate space usage and waste

This commit is contained in:
Jerry He 2016-05-12 15:43:48 -07:00
parent 1267f76e9a
commit acca95fb50
2 changed files with 61 additions and 3 deletions

View File

@ -118,6 +118,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
final static String TMP_DIR = ".tmp";
private int maxFilesPerRegionPerFamily;
private boolean assignSeqIds;
@ -201,6 +205,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
Path familyDir = familyStat.getPath();
byte[] familyName = familyDir.getName().getBytes();
// Skip invalid family
try {
HColumnDescriptor.isLegalFamilyName(familyName);
}
catch (IllegalArgumentException e) {
LOG.warn("Skipping invalid " + familyStat.getPath());
continue;
}
TFamily family = visitor.bulkFamily(familyName);
FileStatus[] hfileStatuses = fs.listStatus(familyDir);
@ -632,9 +644,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
byte[] splitKey) throws IOException {
final Path hfilePath = item.hfilePath;
// We use a '_' prefix which is ignored when walking directory trees
// above.
final String TMP_DIR = "_tmp";
Path tmpDir = item.hfilePath.getParent();
if (!tmpDir.getName().equals(TMP_DIR)) {
tmpDir = new Path(tmpDir, TMP_DIR);
@ -661,6 +670,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
lqis.add(new LoadQueueItem(item.family, botOut));
lqis.add(new LoadQueueItem(item.family, topOut));
// If the current item is already the result of previous splits,
// we don't need it anymore. Clean up to save space.
// It is not part of the original input files.
try {
tmpDir = item.hfilePath.getParent();
if (tmpDir.getName().equals(TMP_DIR)) {
fs.delete(item.hfilePath, false);
}
} catch (IOException e) {
LOG.warn("Unable to delete temporary split file " + item.hfilePath);
}
LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
return lqis;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -63,6 +64,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -424,6 +426,42 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
}
/**
* This test creates a table with many small regions. The bulk load files
* would be splitted multiple times before all of them can be loaded successfully.
*/
@Test (timeout=120000)
public void testSplitTmpFileCleanUp() throws Exception {
final TableName table = TableName.valueOf("splitTmpFileCleanUp");
byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
// create HFiles
Path bulk = buildBulkFiles(table, 2);
try (Table t = connection.getTable(table);
RegionLocator locator = connection.getRegionLocator(table);
Admin admin = connection.getAdmin()) {
lih.doBulkLoad(bulk, admin, t, locator);
}
// family path
Path tmpPath = new Path(bulk, family(0));
// TMP_DIR under family path
tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
FileSystem fs = bulk.getFileSystem(util.getConfiguration());
// HFiles have been splitted, there is TMP_DIR
assertTrue(fs.exists(tmpPath));
// TMP_DIR should have been cleaned-up
assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
FSUtils.listStatus(fs, tmpPath));
assertExpectedTable(connection, table, ROWCOUNT, 2);
}
}
/**
* This simulates an remote exception which should cause LIHF to exit with an
* exception.