diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index bb8c9b83661..441b18b3302 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -27,9 +27,7 @@ import java.io.StringWriter; import java.util.Comparator; import java.util.Iterator; import java.util.Optional; -import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -94,7 +92,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private volatile ThreadPoolExecutor splits; private volatile ThroughputController compactionThroughputController; - private volatile Set underCompactionStores = ConcurrentHashMap.newKeySet(); private volatile boolean compactionsEnabled; /** @@ -116,15 +113,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati CompactionThroughputControllerFactory.create(server, conf); } - // only for test - public CompactSplit(Configuration conf) { - this.server = null; - this.conf = conf; - this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); - createCompactionExecutors(); - createSplitExcecutors(); - } - private void createSplitExcecutors() { final String n = Thread.currentThread().getName(); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); @@ -250,8 +238,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati createCompactionExecutors(); } - // set protected for test - protected interface CompactionCompleteTracker { + private interface CompactionCompleteTracker { default void completed(Store store) { } @@ -329,8 +316,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - // set protected for test - protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, + private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, boolean selectNow, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { if (this.server.isStopped() || (region.getTableDescriptor() != null && @@ -378,12 +364,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } pool.execute( new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); - if (LOG.isDebugEnabled()) { - LOG.debug("Add compact mark for store {}, priority={}, current under compaction " - + "store size is {}", getStoreNameForUnderCompaction(store), priority, - underCompactionStores.size()); - } - underCompactionStores.add(getStoreNameForUnderCompaction(store)); region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; @@ -397,18 +377,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati DUMMY_COMPLETE_TRACKER, null); } - public void requestSystemCompaction(HRegion region, HStore store, String why) + public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { - requestSystemCompaction(region, store, why, false); - } - - public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, - boolean giveUpIfRequestedOrCompacting) throws IOException { - if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { - LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, - store.getColumnFamilyName()); - return; - } requestCompactionInternal(region, store, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); } @@ -501,13 +471,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati return this.regionSplitLimit; } - /** - * Check if this store is under compaction - */ - public boolean isUnderCompaction(final HStore s) { - return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); - } - private static final Comparator COMPARATOR = new Comparator() { @@ -687,22 +650,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati @Override public void run() { - try { - Preconditions.checkNotNull(server); - if (server.isStopped() || (region.getTableDescriptor() != null && - !region.getTableDescriptor().isCompactionEnabled())) { - region.decrementCompactionsQueuedCount(); - return; - } - doCompaction(user); - } finally { - if (LOG.isDebugEnabled()) { - LOG.debug("Remove under compaction mark for store: {}", - store.getHRegion().getRegionInfo().getEncodedName() + ":" + store - .getColumnFamilyName()); - } - underCompactionStores.remove(getStoreNameForUnderCompaction(store)); + Preconditions.checkNotNull(server); + if (server.isStopped() || (region.getTableDescriptor() != null && + !region.getTableDescriptor().isCompactionEnabled())) { + region.decrementCompactionsQueuedCount(); + return; } + doCompaction(user); } private String formatStackTrace(Exception ex) { @@ -870,10 +824,4 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati return shortCompactions; } - private String getStoreNameForUnderCompaction(HStore store) { - return String.format("%s:%s", - store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", - store.getColumnFamilyName()); - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b534f684740..dd9720c75a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; - import edu.umd.cs.findbugs.annotations.Nullable; import io.opentelemetry.api.trace.Span; import java.io.EOFException; @@ -69,7 +68,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -179,7 +177,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -7053,10 +7050,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HStore store = getStore(family); try { if (this.rsServices != null && store.needsCompaction()) { - this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store, - "bulkload hfiles request compaction", true); - LOG.info("Request compaction for region {} family {} after bulk load", - this.getRegionInfo().getEncodedName(), store.getColumnFamilyName()); + this.rsServices.getCompactionRequestor().requestCompaction(this, store, + "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, + CompactionLifeCycleTracker.DUMMY, null); + LOG.debug("bulkload hfiles request compaction region : {}, family : {}", + this.getRegionInfo(), family); } } catch (IOException e) { LOG.error("bulkload hfiles request compaction error ", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java index 31a7ca7ea4e..e5f536007e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java @@ -44,12 +44,6 @@ public interface CompactionRequester { void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; - /** - * Request system compaction on the given store. - */ - void requestSystemCompaction(HRegion region, HStore store, String why, - boolean giveUpIfRequestedOrCompacting) throws IOException; - /** * on/off compaction */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java index b17995a591e..c736513e97a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -18,49 +18,49 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE; - -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; + import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @Category(SmallTests.class) public class TestCompactionAfterBulkLoad extends TestBulkloadBase { + private final RegionServerServices regionServerServices = mock(RegionServerServices.class); + private final CompactionRequester compactionRequester = mock(CompactSplit.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class); - private final RegionServerServices regionServerServices = mock(RegionServerServices.class); - public static AtomicInteger called = new AtomicInteger(0); - @Override protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, byte[]... families) throws IOException { @@ -79,9 +79,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { } @Test - public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException { - final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create()); - called.set(0); + public void shouldRequestCompactionAfterBulkLoad() throws IOException { List> familyPaths = new ArrayList<>(); // enough hfile to request compaction for (int i = 0; i < 5; i++) { @@ -90,7 +88,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { try { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); when(regionServerServices.getConfiguration()).thenReturn(conf); - when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit); + when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) .thenAnswer(new Answer() { @Override @@ -105,77 +103,14 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { } }); - HRegion region = testRegionWithFamilies(family1, family2, family3); - region.bulkLoadHFiles(familyPaths, false, null); - assertEquals(3, called.get()); + Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), + any(), any()); + testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); + // invoke three times for 3 families + verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), + isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); } finally { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); } } - - @Test - public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException { - final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create()); - called.set(0); - List> familyPaths = new ArrayList<>(); - // enough hfile to request compaction - for (int i = 0; i < 5; i++) { - familyPaths.addAll(withFamilyPathsFor(family1, family2, family3)); - } - try { - conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); - when(regionServerServices.getConfiguration()).thenReturn(conf); - when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit); - when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) - .thenAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) { - WALKeyImpl walKey = invocation.getArgument(1); - MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); - if (mvcc != null) { - MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); - walKey.setWriteEntry(we); - } - return 01L; - } - }); - - HRegion region = testRegionWithFamilies(family1, family2, family3); - region.bulkLoadHFiles(familyPaths, false, null); - // invoke three times for 2 families - assertEquals(2, called.get()); - } finally { - conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); - } - } - - private class TestCompactSplit extends CompactSplit { - - TestCompactSplit(Configuration conf) { - super(conf); - } - - @Override - protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, - boolean selectNow, CompactionLifeCycleTracker tracker, - CompactionCompleteTracker completeTracker, User user) throws IOException { - called.addAndGet(1); - } - } - - private class TestFamily1UnderCompact extends TestCompactSplit { - - TestFamily1UnderCompact(Configuration conf) { - super(conf); - } - - @Override - public boolean isUnderCompaction(final HStore s) { - if (s.getColumnFamilyName().equals(Bytes.toString(family1))) { - return true; - } - return super.isUnderCompaction(s); - } - } - }