HBASE-21504 If enable FIFOCompactionPolicy, a compaction may write a "empty" hfile whose maxTimeStamp is long max. This kind of hfile will never be archived.
This commit is contained in:
parent
ad8fbf3fd3
commit
1be42994a5
|
@ -96,16 +96,29 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
||||||
return hasExpiredStores(storeFiles);
|
return hasExpiredStores(storeFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So
|
||||||
|
* if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its
|
||||||
|
* max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be
|
||||||
|
* archived because its TTL will be never expired. So we'll check the empty store file separately.
|
||||||
|
* (See HBASE-21504)
|
||||||
|
*/
|
||||||
|
private boolean isEmptyStoreFile(HStoreFile sf) {
|
||||||
|
return sf.getReader().getEntries() == 0;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean hasExpiredStores(Collection<HStoreFile> files) {
|
private boolean hasExpiredStores(Collection<HStoreFile> files) {
|
||||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||||
for(HStoreFile sf: files){
|
for (HStoreFile sf : files) {
|
||||||
|
if (isEmptyStoreFile(sf)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
||||||
long maxTs = sf.getReader().getMaxTimestamp();
|
long maxTs = sf.getReader().getMaxTimestamp();
|
||||||
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
||||||
if (maxTtl == Long.MAX_VALUE
|
if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
|
||||||
|| (currentTime - maxTtl < maxTs)){
|
|
||||||
continue;
|
continue;
|
||||||
} else{
|
} else {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -116,14 +129,17 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
||||||
Collection<HStoreFile> filesCompacting) {
|
Collection<HStoreFile> filesCompacting) {
|
||||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||||
Collection<HStoreFile> expiredStores = new ArrayList<>();
|
Collection<HStoreFile> expiredStores = new ArrayList<>();
|
||||||
for(HStoreFile sf: files){
|
for (HStoreFile sf : files) {
|
||||||
|
if (isEmptyStoreFile(sf)) {
|
||||||
|
expiredStores.add(sf);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
// Check MIN_VERSIONS is in HStore removeUnneededFiles
|
||||||
long maxTs = sf.getReader().getMaxTimestamp();
|
long maxTs = sf.getReader().getMaxTimestamp();
|
||||||
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
long maxTtl = storeConfigInfo.getStoreFileTtl();
|
||||||
if (maxTtl == Long.MAX_VALUE
|
if (maxTtl == Long.MAX_VALUE || (currentTime - maxTtl < maxTs)) {
|
||||||
|| (currentTime - maxTtl < maxTs)){
|
|
||||||
continue;
|
continue;
|
||||||
} else if(filesCompacting == null || !filesCompacting.contains(sf)){
|
} else if (filesCompacting == null || !filesCompacting.contains(sf)) {
|
||||||
expiredStores.add(sf);
|
expiredStores.add(sf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.TimeOffsetEnvironmentEdge;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -191,4 +194,64 @@ public class TestFIFOCompactionPolicy {
|
||||||
.build();
|
.build();
|
||||||
TEST_UTIL.getAdmin().createTable(desc);
|
TEST_UTIL.getAdmin().createTable(desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test for HBASE-21504
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFIFOCompactionPolicyExpiredEmptyHFiles() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testFIFOCompactionPolicyExpiredEmptyHFiles");
|
||||||
|
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
|
||||||
|
FIFOCompactionPolicy.class.getName())
|
||||||
|
.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||||
|
DisabledRegionSplitPolicy.class.getName())
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build())
|
||||||
|
.build();
|
||||||
|
Table table = TEST_UTIL.createTable(desc, null);
|
||||||
|
long ts = System.currentTimeMillis() - 10 * 1000;
|
||||||
|
Put put =
|
||||||
|
new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, ts, Bytes.toBytes("value0"));
|
||||||
|
table.put(put);
|
||||||
|
TEST_UTIL.getAdmin().flush(tableName); // HFile-0
|
||||||
|
put = new Put(Bytes.toBytes("row2")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||||
|
table.put(put);
|
||||||
|
TEST_UTIL.getAdmin().flush(tableName); // HFile-1
|
||||||
|
|
||||||
|
HStore store = getStoreWithName(tableName);
|
||||||
|
Assert.assertNotNull(store);
|
||||||
|
Assert.assertEquals(2, store.getStorefilesCount());
|
||||||
|
|
||||||
|
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
if (store.getStorefilesCount() > 1) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1, store.getStorefilesCount());
|
||||||
|
HStoreFile sf = store.getStorefiles().iterator().next();
|
||||||
|
Assert.assertNotNull(sf);
|
||||||
|
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||||
|
|
||||||
|
put = new Put(Bytes.toBytes("row3")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||||
|
table.put(put);
|
||||||
|
TEST_UTIL.getAdmin().flush(tableName); // HFile-2
|
||||||
|
Assert.assertEquals(2, store.getStorefilesCount());
|
||||||
|
|
||||||
|
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
if (store.getStorefilesCount() > 1) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, store.getStorefilesCount());
|
||||||
|
sf = store.getStorefiles().iterator().next();
|
||||||
|
Assert.assertNotNull(sf);
|
||||||
|
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue