HBASE-21905: [FIFOCompaction] Don't compact already inflight store files (#1829)
This one surfaced as a flake test but turns out to be a legit bug in FIFOCompaction code. FifoCompaction does not check if an empty store file is already being compacted by an in-flight compaction request and still enqueues. It can potentially race with a running compaction (as in this test case, see jira for the exact exception). Fixes the edge case and cleans up the test code a bit. Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
0078268203
commit
60c9ae58fb
|
@ -97,11 +97,11 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The FIFOCompactionPolicy only choose those TTL expired HFiles as the compaction candidates. So
|
* The FIFOCompactionPolicy only choose the TTL expired store files as the compaction candidates.
|
||||||
* if all HFiles are TTL expired, then the compaction will generate a new empty HFile. While its
|
* If all the store files are TTL expired, then the compaction will generate a new empty file.
|
||||||
* max timestamp will be Long.MAX_VALUE. If not considered separately, the HFile will never be
|
* While its max timestamp will be Long.MAX_VALUE. If not considered separately, the store file
|
||||||
* archived because its TTL will be never expired. So we'll check the empty store file separately.
|
* will never be archived because its TTL will be never expired. So we'll check the empty store
|
||||||
* (See HBASE-21504)
|
* file separately (See HBASE-21504).
|
||||||
*/
|
*/
|
||||||
private boolean isEmptyStoreFile(HStoreFile sf) {
|
private boolean isEmptyStoreFile(HStoreFile sf) {
|
||||||
return sf.getReader().getEntries() == 0;
|
return sf.getReader().getEntries() == 0;
|
||||||
|
@ -130,7 +130,7 @@ public class FIFOCompactionPolicy extends ExploringCompactionPolicy {
|
||||||
long currentTime = EnvironmentEdgeManager.currentTime();
|
long currentTime = EnvironmentEdgeManager.currentTime();
|
||||||
Collection<HStoreFile> expiredStores = new ArrayList<>();
|
Collection<HStoreFile> expiredStores = new ArrayList<>();
|
||||||
for (HStoreFile sf : files) {
|
for (HStoreFile sf : files) {
|
||||||
if (isEmptyStoreFile(sf)) {
|
if (isEmptyStoreFile(sf) && !filesCompacting.contains(sf)) {
|
||||||
expiredStores.add(sf);
|
expiredStores.add(sf);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
@ -58,6 +59,7 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
@Category({ RegionServerTests.class, MediumTests.class })
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
public class TestFIFOCompactionPolicy {
|
public class TestFIFOCompactionPolicy {
|
||||||
|
@ -216,23 +218,18 @@ public class TestFIFOCompactionPolicy {
|
||||||
TEST_UTIL.getAdmin().flush(tableName); // HFile-0
|
TEST_UTIL.getAdmin().flush(tableName); // HFile-0
|
||||||
put = new Put(Bytes.toBytes("row2")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
put = new Put(Bytes.toBytes("row2")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
final int testWaitTimeoutMs = 20000;
|
||||||
TEST_UTIL.getAdmin().flush(tableName); // HFile-1
|
TEST_UTIL.getAdmin().flush(tableName); // HFile-1
|
||||||
|
|
||||||
HStore store = getStoreWithName(tableName);
|
HStore store = Preconditions.checkNotNull(getStoreWithName(tableName));
|
||||||
Assert.assertNotNull(store);
|
|
||||||
Assert.assertEquals(2, store.getStorefilesCount());
|
Assert.assertEquals(2, store.getStorefilesCount());
|
||||||
|
|
||||||
TEST_UTIL.getAdmin().majorCompact(tableName);
|
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||||
for (int i = 0; i < 100; i++) {
|
TEST_UTIL.waitFor(testWaitTimeoutMs,
|
||||||
if (store.getStorefilesCount() > 1) {
|
(Waiter.Predicate<Exception>) () -> store.getStorefilesCount() == 1);
|
||||||
Thread.sleep(100);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals(1, store.getStorefilesCount());
|
Assert.assertEquals(1, store.getStorefilesCount());
|
||||||
HStoreFile sf = store.getStorefiles().iterator().next();
|
HStoreFile sf = Preconditions.checkNotNull(store.getStorefiles().iterator().next());
|
||||||
Assert.assertNotNull(sf);
|
|
||||||
Assert.assertEquals(0, sf.getReader().getEntries());
|
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||||
|
|
||||||
put = new Put(Bytes.toBytes("row3")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
put = new Put(Bytes.toBytes("row3")).addColumn(family, qualifier, ts, Bytes.toBytes("value1"));
|
||||||
|
@ -241,17 +238,11 @@ public class TestFIFOCompactionPolicy {
|
||||||
Assert.assertEquals(2, store.getStorefilesCount());
|
Assert.assertEquals(2, store.getStorefilesCount());
|
||||||
|
|
||||||
TEST_UTIL.getAdmin().majorCompact(tableName);
|
TEST_UTIL.getAdmin().majorCompact(tableName);
|
||||||
for (int i = 0; i < 100; i++) {
|
TEST_UTIL.waitFor(testWaitTimeoutMs,
|
||||||
if (store.getStorefilesCount() > 1) {
|
(Waiter.Predicate<Exception>) () -> store.getStorefilesCount() == 1);
|
||||||
Thread.sleep(100);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert.assertEquals(1, store.getStorefilesCount());
|
Assert.assertEquals(1, store.getStorefilesCount());
|
||||||
sf = store.getStorefiles().iterator().next();
|
sf = Preconditions.checkNotNull(store.getStorefiles().iterator().next());
|
||||||
Assert.assertNotNull(sf);
|
|
||||||
Assert.assertEquals(0, sf.getReader().getEntries());
|
Assert.assertEquals(0, sf.getReader().getEntries());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue