From f8d03d122769b98d896b3a461c9dbb6a03976a05 Mon Sep 17 00:00:00 2001 From: Xiaolin Ha Date: Tue, 23 Nov 2021 15:45:21 +0800 Subject: [PATCH] HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874) Signed-off-by: Duo Zhang --- .../hbase/regionserver/CompactSplit.java | 70 ++++++++++-- .../hadoop/hbase/regionserver/HRegion.java | 12 ++- .../compactions/CompactionRequester.java | 6 ++ .../TestCompactionAfterBulkLoad.java | 101 ++++++++++++++---- 4 files changed, 157 insertions(+), 32 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index d40a8828127..bb8c9b83661 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -27,7 +27,9 @@ import java.io.StringWriter; import java.util.Comparator; import java.util.Iterator; import java.util.Optional; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; @@ -92,6 +94,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private volatile ThreadPoolExecutor splits; private volatile ThroughputController compactionThroughputController; + private volatile Set underCompactionStores = ConcurrentHashMap.newKeySet(); private volatile boolean compactionsEnabled; /** @@ -113,6 +116,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati CompactionThroughputControllerFactory.create(server, conf); } + // only for test + public CompactSplit(Configuration conf) { + this.server = null; + this.conf = conf; + this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); + createCompactionExecutors(); + createSplitExcecutors(); + } + private void createSplitExcecutors() { final String n = Thread.currentThread().getName(); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); @@ -238,7 +250,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati createCompactionExecutors(); } - private interface CompactionCompleteTracker { + // set protected for test + protected interface CompactionCompleteTracker { default void completed(Store store) { } @@ -316,7 +329,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } - private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, + // set protected for test + protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, boolean selectNow, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { if (this.server.isStopped() || (region.getTableDescriptor() != null && @@ -364,6 +378,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } pool.execute( new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); + if (LOG.isDebugEnabled()) { + LOG.debug("Add compact mark for store {}, priority={}, current under compaction " + + "store size is {}", getStoreNameForUnderCompaction(store), priority, + underCompactionStores.size()); + } + underCompactionStores.add(getStoreNameForUnderCompaction(store)); region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; @@ -377,8 +397,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati DUMMY_COMPLETE_TRACKER, null); } - public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) + public void requestSystemCompaction(HRegion region, HStore store, String why) throws IOException { + requestSystemCompaction(region, store, why, false); + } + + public synchronized void requestSystemCompaction(HRegion region, HStore store, String why, + boolean giveUpIfRequestedOrCompacting) throws IOException { + if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) { + LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region, + store.getColumnFamilyName()); + return; + } requestCompactionInternal(region, store, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); } @@ -471,6 +501,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati return this.regionSplitLimit; } + /** + * Check if this store is under compaction + */ + public boolean isUnderCompaction(final HStore s) { + return underCompactionStores.contains(getStoreNameForUnderCompaction(s)); + } + private static final Comparator COMPARATOR = new Comparator() { @@ -650,13 +687,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati @Override public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped() || (region.getTableDescriptor() != null && - !region.getTableDescriptor().isCompactionEnabled())) { - region.decrementCompactionsQueuedCount(); - return; + try { + Preconditions.checkNotNull(server); + if (server.isStopped() || (region.getTableDescriptor() != null && + !region.getTableDescriptor().isCompactionEnabled())) { + region.decrementCompactionsQueuedCount(); + return; + } + doCompaction(user); + } finally { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove under compaction mark for store: {}", + store.getHRegion().getRegionInfo().getEncodedName() + ":" + store + .getColumnFamilyName()); + } + underCompactionStores.remove(getStoreNameForUnderCompaction(store)); } - doCompaction(user); } private String formatStackTrace(Exception ex) { @@ -824,4 +870,10 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati return shortCompactions; } + private String getStoreNameForUnderCompaction(HStore store) { + return String.format("%s:%s", + store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "", + store.getColumnFamilyName()); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e412c479a40..3db87ec3bbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; + import edu.umd.cs.findbugs.annotations.Nullable; import io.opentelemetry.api.trace.Span; import java.io.EOFException; @@ -72,6 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -196,6 +198,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -7046,11 +7049,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HStore store = getStore(family); try { if (this.rsServices != null && store.needsCompaction()) { - this.rsServices.getCompactionRequestor().requestCompaction(this, store, - "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, - CompactionLifeCycleTracker.DUMMY, null); - LOG.debug("bulkload hfiles request compaction region : {}, family : {}", - this.getRegionInfo(), family); + this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store, + "bulkload hfiles request compaction", true); + LOG.info("Request compaction for region {} family {} after bulk load", + this.getRegionInfo().getEncodedName(), store.getColumnFamilyName()); } } catch (IOException e) { LOG.error("bulkload hfiles request compaction error ", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java index e5f536007e8..31a7ca7ea4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java @@ -44,6 +44,12 @@ public interface CompactionRequester { void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; + /** + * Request system compaction on the given store. + */ + void requestSystemCompaction(HRegion region, HStore store, String why, + boolean giveUpIfRequestedOrCompacting) throws IOException; + /** * on/off compaction */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java index c736513e97a..b17995a591e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -18,49 +18,49 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE; + +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; - import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @Category(SmallTests.class) public class TestCompactionAfterBulkLoad extends TestBulkloadBase { - private final RegionServerServices regionServerServices = mock(RegionServerServices.class); - private final CompactionRequester compactionRequester = mock(CompactSplit.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class); + private final RegionServerServices regionServerServices = mock(RegionServerServices.class); + public static AtomicInteger called = new AtomicInteger(0); + @Override protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, byte[]... families) throws IOException { @@ -79,7 +79,9 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { } @Test - public void shouldRequestCompactionAfterBulkLoad() throws IOException { + public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException { + final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create()); + called.set(0); List> familyPaths = new ArrayList<>(); // enough hfile to request compaction for (int i = 0; i < 5; i++) { @@ -88,7 +90,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { try { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); when(regionServerServices.getConfiguration()).thenReturn(conf); - when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); + when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit); when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) .thenAnswer(new Answer() { @Override @@ -103,14 +105,77 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase { } }); - Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), - any(), any()); - testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); - // invoke three times for 3 families - verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), - isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); + HRegion region = testRegionWithFamilies(family1, family2, family3); + region.bulkLoadHFiles(familyPaths, false, null); + assertEquals(3, called.get()); } finally { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); } } + + @Test + public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException { + final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create()); + called.set(0); + List> familyPaths = new ArrayList<>(); + // enough hfile to request compaction + for (int i = 0; i < 5; i++) { + familyPaths.addAll(withFamilyPathsFor(family1, family2, family3)); + } + try { + conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); + when(regionServerServices.getConfiguration()).thenReturn(conf); + when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit); + when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + WALKeyImpl walKey = invocation.getArgument(1); + MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); + if (mvcc != null) { + MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); + walKey.setWriteEntry(we); + } + return 01L; + } + }); + + HRegion region = testRegionWithFamilies(family1, family2, family3); + region.bulkLoadHFiles(familyPaths, false, null); + // invoke three times for 2 families + assertEquals(2, called.get()); + } finally { + conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); + } + } + + private class TestCompactSplit extends CompactSplit { + + TestCompactSplit(Configuration conf) { + super(conf); + } + + @Override + protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority, + boolean selectNow, CompactionLifeCycleTracker tracker, + CompactionCompleteTracker completeTracker, User user) throws IOException { + called.addAndGet(1); + } + } + + private class TestFamily1UnderCompact extends TestCompactSplit { + + TestFamily1UnderCompact(Configuration conf) { + super(conf); + } + + @Override + public boolean isUnderCompaction(final HStore s) { + if (s.getColumnFamilyName().equals(Bytes.toString(family1))) { + return true; + } + return super.isUnderCompaction(s); + } + } + }