HBASE-21504 If enable FIFOCompactionPolicy, a compaction may write a "empty" hfile whose maxTimeStamp is long max. This kind of hfile will never be archived.
This commit is contained in:
parent
3ad2a89fdf
commit
d42e0ade1c
|
@ -96,16 +96,29 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
|||
return hasExpiredStores(storeFiles);
|
||||
}
|
||||
|
||||
/**
|
||||
* The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So
|
||||
* if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its
|
||||
* max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be
|
||||
* archived because its TTL will be never expired. So we'll check the empty store file separately.
|
||||
* (See HBASE-21504)
|
||||
*/
|
||||
private boolean isEmptyStoreFile(HStoreFile sf) {
|
||||
return sf.getReader().getEntries() == 0;
|
||||
}
|
||||
|
||||
private boolean hasExpiredStores(Collection<HStoreFile> files) {
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
for(HStoreFile sf: files){
|
||||
for (HStoreFile sf : files) {
|
||||
if (isEmptyStoreFile(sf)) {
|
||||
return true;
|
||||
}
|
||||
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
||||
long maxTs = sf.getReader().getMaxTimestamp();
|
||||
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
||||
if (maxTtl == Long.MAX_VALUE
|
||||
|| (currentTime - maxTtl < maxTs)){
|
||||
continue;
|
||||
} else{
|
||||
if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
|
||||
continue;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -116,14 +129,17 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
|||
Collection<HStoreFile> filesCompacting) {
|
||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
Collection<HStoreFile> expiredStores = new ArrayList<>();
|
||||
for(HStoreFile sf: files){
|
||||
for (HStoreFile sf : files) {
|
||||
if (isEmptyStoreFile(sf)) {
|
||||
expiredStores.add(sf);
|
||||
continue;
|
||||
}
|
||||
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
||||
long maxTs = sf.getReader().getMaxTimestamp();
|
||||
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
||||
if (maxTtl == Long.MAX_VALUE
|
||||
|| (currentTime - maxTtl < maxTs)){
|
||||
continue;
|
||||
} else if(filesCompacting == null || !filesCompacting.contains(sf)){
|
||||
if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
|
||||
continue;
|
||||
} else if (filesCompacting == null || !filesCompacting.contains(sf)) {
|
||||
expiredStores.add(sf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
|||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
|
@ -191,4 +194,64 @@ public class TestFIFOCompactionPolicy {
|
|||
.build();
|
||||
TEST_UTIL.getAdmin().createTable(desc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unit test for HBASE-21504
|
||||
*/
|
||||
@Test
|
||||
public void testFIFOCompactionPolicyExpiredEmptyHFiles() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testFIFOCompactionPolicyExpiredEmptyHFiles");
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
|
||||
FIFOCompactionPolicy.class.getName())
|
||||
.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||
DisabledRegionSplitPolicy.class.getName())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build())
|
||||
.build();
|
||||
Table table = TEST_UTIL.createTable(desc, null);
|
||||
long ts = System.currentTimeMillis() - 10 * 1000;
|
||||
Put put =
|
||||
new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, ts, Bytes.toBytes("value0"));
|
||||
table.put(put);
|
||||
TEST_UTIL.getAdmin().flush(tableName); // HFile-0
|
||||
put = new Put(Bytes.toBytes("row2")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||
table.put(put);
|
||||
TEST_UTIL.getAdmin().flush(tableName); // HFile-1
|
||||
|
||||
HStore store = getStoreWithName(tableName);
|
||||
Assert.assertNotNull(store);
|
||||
Assert.assertEquals(2, store.getStorefilesCount());
|
||||
|
||||
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
if (store.getStorefilesCount() > 1) {
|
||||
Thread.sleep(100);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertEquals(1, store.getStorefilesCount());
|
||||
HStoreFile sf = store.getStorefiles().iterator().next();
|
||||
Assert.assertNotNull(sf);
|
||||
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||
|
||||
put = new Put(Bytes.toBytes("row3")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||
table.put(put);
|
||||
TEST_UTIL.getAdmin().flush(tableName); // HFile-2
|
||||
Assert.assertEquals(2, store.getStorefilesCount());
|
||||
|
||||
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
if (store.getStorefilesCount() > 1) {
|
||||
Thread.sleep(100);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(1, store.getStorefilesCount());
|
||||
sf = store.getStorefiles().iterator().next();
|
||||
Assert.assertNotNull(sf);
|
||||
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue