HBASE-3690 Option to Exclude Bulk Import Files from Minor Compaction
Summary: We ran an incremental scrape with HFileOutputFormat and encountered major compaction storms. This is caused by the bug in HBASE-3404. The permanent fix is a little tricky without HBASE-2856. We realized that a quicker solution for avoiding these compaction storms is to simply exclude bulk import files from minor compactions and let them only be handled by time-based major compactions. Add with functionality along with a config option to enable it. Rewrote this feature to be done on a per-bulkload basis. Test Plan: - mvn test -Dtest=TestHFileOutputFormat DiffCamp Revision: Reviewers: stack, Kannan, JIRA, dhruba Reviewed By: stack CC: dhruba, lhofhansl, nspiegelberg, stack Differential Revision: 357 git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1200621 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6c57076f7e
commit
04afdf9633
|
@ -90,6 +90,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
// Invented config. Add to hbase-*.xml if other than default compression.
|
||||
final String defaultCompression = conf.get("hfile.compression",
|
||||
Compression.Algorithm.NONE.getName());
|
||||
final boolean compactionExclude = conf.getBoolean(
|
||||
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
|
||||
|
||||
// create a map from column family to the compression algorithm
|
||||
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
|
||||
|
@ -186,6 +188,8 @@ public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable,
|
|||
Bytes.toBytes(context.getTaskAttemptID().toString()));
|
||||
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(true));
|
||||
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(compactionExclude));
|
||||
w.appendFileInfo(StoreFile.TIMERANGE_KEY,
|
||||
WritableUtils.toByteArray(trt));
|
||||
w.close();
|
||||
|
|
|
@ -62,6 +62,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Collections2;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -1092,6 +1094,14 @@ public class Store implements HeapSize {
|
|||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// remove bulk import files that request to be excluded from minors
|
||||
filesToCompact.removeAll(Collections2.filter(filesToCompact,
|
||||
new Predicate<StoreFile>() {
|
||||
public boolean apply(StoreFile input) {
|
||||
return input.excludeFromMinorCompaction();
|
||||
}
|
||||
}));
|
||||
|
||||
/* TODO: add sorting + unit test back in when HBASE-2856 is fixed
|
||||
// Sort files by size to correct when normal skew is altered by bulk load.
|
||||
Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
|
||||
|
|
|
@ -109,6 +109,10 @@ public class StoreFile {
|
|||
public static final byte[] MAJOR_COMPACTION_KEY =
|
||||
Bytes.toBytes("MAJOR_COMPACTION_KEY");
|
||||
|
||||
/** Major compaction flag in FileInfo */
|
||||
public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
|
||||
Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
|
||||
|
||||
/** Bloom filter Type in FileInfo */
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY =
|
||||
Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
|
@ -155,6 +159,10 @@ public class StoreFile {
|
|||
// whenever you get a Reader.
|
||||
private AtomicBoolean majorCompaction = null;
|
||||
|
||||
// If true, this file should not be included in minor compactions.
|
||||
// It's set whenever you get a Reader.
|
||||
private boolean excludeFromMinorCompaction = false;
|
||||
|
||||
/** Meta key set when store file is a result of a bulk load */
|
||||
public static final byte[] BULKLOAD_TASK_KEY =
|
||||
Bytes.toBytes("BULKLOAD_SOURCE_TASK");
|
||||
|
@ -313,6 +321,13 @@ public class StoreFile {
|
|||
return this.majorCompaction.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if this file should not be part of a minor compaction.
|
||||
*/
|
||||
boolean excludeFromMinorCompaction() {
|
||||
return this.excludeFromMinorCompaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return This files maximum edit sequence id.
|
||||
*/
|
||||
|
@ -487,6 +502,9 @@ public class StoreFile {
|
|||
this.majorCompaction = new AtomicBoolean(false);
|
||||
}
|
||||
|
||||
b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
|
||||
this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
|
||||
|
||||
BloomType hfileBloomType = reader.getBloomFilterType();
|
||||
if (cfBloomType != BloomType.NONE) {
|
||||
reader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
@ -45,6 +46,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -54,8 +56,10 @@ import org.apache.hadoop.hbase.io.hfile.Compression;
|
|||
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
|
@ -235,7 +239,7 @@ public class TestHFileOutputFormat {
|
|||
* metadata used by time-restricted scans.
|
||||
*/
|
||||
@Test
|
||||
public void test_TIMERANGE() throws Exception {
|
||||
public void test_TIMERANGE() throws Exception {
|
||||
Configuration conf = new Configuration(this.util.getConfiguration());
|
||||
RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
|
||||
TaskAttemptContext context = null;
|
||||
|
@ -679,6 +683,95 @@ public class TestHFileOutputFormat {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExcludeMinorCompaction() throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.setInt("hbase.hstore.compaction.min", 2);
|
||||
Path testDir = util.getDataTestDir("testExcludeMinorCompaction");
|
||||
byte[][] startKeys = generateRandomStartKeys(5);
|
||||
|
||||
try {
|
||||
util.startMiniCluster();
|
||||
final FileSystem fs = util.getDFSCluster().getFileSystem();
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
HTable table = util.createTable(TABLE_NAME, FAMILIES);
|
||||
assertEquals("Should start with empty table", 0, util.countRows(table));
|
||||
|
||||
// deep inspection: get the StoreFile dir
|
||||
final Path storePath = Store.getStoreHomedir(
|
||||
HTableDescriptor.getTableDir(FSUtils.getRootDir(conf), TABLE_NAME),
|
||||
admin.getTableRegions(TABLE_NAME).get(0).getEncodedName(),
|
||||
FAMILIES[0]);
|
||||
assertEquals(0, fs.listStatus(storePath).length);
|
||||
|
||||
// put some data in it and flush to create a storefile
|
||||
Put p = new Put(Bytes.toBytes("test"));
|
||||
p.add(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
|
||||
table.put(p);
|
||||
admin.flush(TABLE_NAME);
|
||||
assertEquals(1, util.countRows(table));
|
||||
quickPoll(new Callable<Boolean>() {
|
||||
public Boolean call() throws Exception {
|
||||
return fs.listStatus(storePath).length == 1;
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
// Generate a bulk load file with more rows
|
||||
conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
|
||||
true);
|
||||
util.startMiniMapReduceCluster();
|
||||
runIncrementalPELoad(conf, table, testDir);
|
||||
|
||||
// Perform the actual load
|
||||
new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
|
||||
|
||||
// Ensure data shows up
|
||||
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
|
||||
assertEquals("LoadIncrementalHFiles should put expected data in table",
|
||||
expectedRows + 1, util.countRows(table));
|
||||
|
||||
// should have a second StoreFile now
|
||||
assertEquals(2, fs.listStatus(storePath).length);
|
||||
|
||||
// minor compactions shouldn't get rid of the file
|
||||
admin.compact(TABLE_NAME);
|
||||
try {
|
||||
quickPoll(new Callable<Boolean>() {
|
||||
public Boolean call() throws Exception {
|
||||
return fs.listStatus(storePath).length == 1;
|
||||
}
|
||||
}, 5000);
|
||||
throw new IOException("SF# = " + fs.listStatus(storePath).length);
|
||||
} catch (AssertionError ae) {
|
||||
// this is expected behavior
|
||||
}
|
||||
|
||||
// a major compaction should work though
|
||||
admin.majorCompact(TABLE_NAME);
|
||||
quickPoll(new Callable<Boolean>() {
|
||||
public Boolean call() throws Exception {
|
||||
return fs.listStatus(storePath).length == 1;
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
} finally {
|
||||
util.shutdownMiniMapReduceCluster();
|
||||
util.shutdownMiniCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
|
||||
int sleepMs = 10;
|
||||
int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
|
||||
while (retries-- > 0) {
|
||||
if (c.call().booleanValue()) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(sleepMs);
|
||||
}
|
||||
fail();
|
||||
}
|
||||
|
||||
public static void main(String args[]) throws Exception {
|
||||
new TestHFileOutputFormat().manualTest(args);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue