HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Xiaolin Ha 2021-11-23 15:45:21 +08:00 committed by haxiaolin
parent 61406252a5
commit f8d03d1227
4 changed files with 157 additions and 32 deletions

View File

@ -27,7 +27,9 @@ import java.io.StringWriter;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
@ -92,6 +94,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private volatile ThreadPoolExecutor splits; private volatile ThreadPoolExecutor splits;
private volatile ThroughputController compactionThroughputController; private volatile ThroughputController compactionThroughputController;
private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
private volatile boolean compactionsEnabled; private volatile boolean compactionsEnabled;
/** /**
@ -113,6 +116,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
CompactionThroughputControllerFactory.create(server, conf); CompactionThroughputControllerFactory.create(server, conf);
} }
// only for test
public CompactSplit(Configuration conf) {
this.server = null;
this.conf = conf;
this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
createCompactionExecutors();
createSplitExcecutors();
}
private void createSplitExcecutors() { private void createSplitExcecutors() {
final String n = Thread.currentThread().getName(); final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@ -238,7 +250,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
createCompactionExecutors(); createCompactionExecutors();
} }
private interface CompactionCompleteTracker { // set protected for test
protected interface CompactionCompleteTracker {
default void completed(Store store) { default void completed(Store store) {
} }
@ -316,7 +329,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
} }
} }
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority, // set protected for test
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker, boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException { CompactionCompleteTracker completeTracker, User user) throws IOException {
if (this.server.isStopped() || (region.getTableDescriptor() != null && if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@ -364,6 +378,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
} }
pool.execute( pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}", getStoreNameForUnderCompaction(store), priority,
underCompactionStores.size());
}
underCompactionStores.add(getStoreNameForUnderCompaction(store));
region.incrementCompactionsQueuedCount(); region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large "; String type = (pool == shortCompactions) ? "Small " : "Large ";
@ -377,8 +397,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
DUMMY_COMPLETE_TRACKER, null); DUMMY_COMPLETE_TRACKER, null);
} }
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why) public void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException { throws IOException {
requestSystemCompaction(region, store, why, false);
}
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException {
if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
store.getColumnFamilyName());
return;
}
requestCompactionInternal(region, store, why, NO_PRIORITY, false, requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null); CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
} }
@ -471,6 +501,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return this.regionSplitLimit; return this.regionSplitLimit;
} }
/**
* Check if this store is under compaction
*/
public boolean isUnderCompaction(final HStore s) {
return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
}
private static final Comparator<Runnable> COMPARATOR = private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() { new Comparator<Runnable>() {
@ -650,13 +687,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
@Override @Override
public void run() { public void run() {
Preconditions.checkNotNull(server); try {
if (server.isStopped() || (region.getTableDescriptor() != null && Preconditions.checkNotNull(server);
!region.getTableDescriptor().isCompactionEnabled())) { if (server.isStopped() || (region.getTableDescriptor() != null &&
region.decrementCompactionsQueuedCount(); !region.getTableDescriptor().isCompactionEnabled())) {
return; region.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove under compaction mark for store: {}",
store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
.getColumnFamilyName());
}
underCompactionStores.remove(getStoreNameForUnderCompaction(store));
} }
doCompaction(user);
} }
private String formatStackTrace(Exception ex) { private String formatStackTrace(Exception ex) {
@ -824,4 +870,10 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return shortCompactions; return shortCompactions;
} }
private String getStoreNameForUnderCompaction(HStore store) {
return String.format("%s:%s",
store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
store.getColumnFamilyName());
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import edu.umd.cs.findbugs.annotations.Nullable; import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import java.io.EOFException; import java.io.EOFException;
@ -72,6 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -196,6 +198,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -7046,11 +7049,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HStore store = getStore(family); HStore store = getStore(family);
try { try {
if (this.rsServices != null && store.needsCompaction()) { if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestCompaction(this, store, this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1, "bulkload hfiles request compaction", true);
CompactionLifeCycleTracker.DUMMY, null); LOG.info("Request compaction for region {} family {} after bulk load",
LOG.debug("bulkload hfiles request compaction region : {}, family : {}", this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
this.getRegionInfo(), family);
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e); LOG.error("bulkload hfiles request compaction error ", e);

View File

@ -44,6 +44,12 @@ public interface CompactionRequester {
void requestCompaction(HRegion region, HStore store, String why, int priority, void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
/**
* Request system compaction on the given store.
*/
void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException;
/** /**
* on/off compaction * on/off compaction
*/ */

View File

@ -18,49 +18,49 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE; import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat; import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@Category(SmallTests.class) @Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase { public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
private final CompactionRequester compactionRequester = mock(CompactSplit.class);
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class); HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
public static AtomicInteger called = new AtomicInteger(0);
@Override @Override
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName, protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException { byte[]... families) throws IOException {
@ -79,7 +79,9 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
} }
@Test @Test
public void shouldRequestCompactionAfterBulkLoad() throws IOException { public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
called.set(0);
List<Pair<byte[], String>> familyPaths = new ArrayList<>(); List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction // enough hfile to request compaction
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
@ -88,7 +90,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
try { try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf); when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() { .thenAnswer(new Answer() {
@Override @Override
@ -103,14 +105,77 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
} }
}); });
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), HRegion region = testRegionWithFamilies(family1, family2, family3);
any(), any()); region.bulkLoadHFiles(familyPaths, false, null);
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); assertEquals(3, called.get());
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
} finally { } finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
} }
} }
@Test
public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
called.set(0);
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
}
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});
HRegion region = testRegionWithFamilies(family1, family2, family3);
region.bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 2 families
assertEquals(2, called.get());
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}
private class TestCompactSplit extends CompactSplit {
TestCompactSplit(Configuration conf) {
super(conf);
}
@Override
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
called.addAndGet(1);
}
}
private class TestFamily1UnderCompact extends TestCompactSplit {
TestFamily1UnderCompact(Configuration conf) {
super(conf);
}
@Override
public boolean isUnderCompaction(final HStore s) {
if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
return true;
}
return super.isUnderCompaction(s);
}
}
} }