Revert "HBASE-26249 Ameliorate compaction made by bul… (#3831)"

This reverts commit 5e62e2aa8d.
This commit is contained in:
haxiaolin 2021-11-23 11:15:14 +08:00
parent 5e62e2aa8d
commit 2aafd31ff5
4 changed files with 32 additions and 157 deletions

View File

@ -27,9 +27,7 @@ import java.io.StringWriter;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
@ -94,7 +92,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private volatile ThreadPoolExecutor splits;
private volatile ThroughputController compactionThroughputController;
private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
private volatile boolean compactionsEnabled;
/**
@ -116,15 +113,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
CompactionThroughputControllerFactory.create(server, conf);
}
// only for test
public CompactSplit(Configuration conf) {
this.server = null;
this.conf = conf;
this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
createCompactionExecutors();
createSplitExcecutors();
}
private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@ -250,8 +238,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
createCompactionExecutors();
}
// set protected for test
protected interface CompactionCompleteTracker {
private interface CompactionCompleteTracker {
default void completed(Store store) {
}
@ -329,8 +316,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
}
// set protected for test
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@ -378,12 +364,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
if (LOG.isDebugEnabled()) {
LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}", getStoreNameForUnderCompaction(store), priority,
underCompactionStores.size());
}
underCompactionStores.add(getStoreNameForUnderCompaction(store));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
@ -397,18 +377,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
DUMMY_COMPLETE_TRACKER, null);
}
public void requestSystemCompaction(HRegion region, HStore store, String why)
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException {
requestSystemCompaction(region, store, why, false);
}
public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException {
if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
store.getColumnFamilyName());
return;
}
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
}
@ -501,13 +471,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return this.regionSplitLimit;
}
/**
* Check if this store is under compaction
*/
public boolean isUnderCompaction(final HStore s) {
return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
}
private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() {
@ -687,22 +650,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
@Override
public void run() {
try {
Preconditions.checkNotNull(server);
if (server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("Remove under compaction mark for store: {}",
store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
.getColumnFamilyName());
}
underCompactionStores.remove(getStoreNameForUnderCompaction(store));
Preconditions.checkNotNull(server);
if (server.isStopped() || (region.getTableDescriptor() != null &&
!region.getTableDescriptor().isCompactionEnabled())) {
region.decrementCompactionsQueuedCount();
return;
}
doCompaction(user);
}
private String formatStackTrace(Exception ex) {
@ -870,10 +824,4 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return shortCompactions;
}
private String getStoreNameForUnderCompaction(HStore store) {
return String.format("%s:%s",
store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
store.getColumnFamilyName());
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import java.io.EOFException;
@ -69,7 +68,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -179,7 +177,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -7053,10 +7050,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
"bulkload hfiles request compaction", true);
LOG.info("Request compaction for region {} family {} after bulk load",
this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
this.getRegionInfo(), family);
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);

View File

@ -44,12 +44,6 @@ public interface CompactionRequester {
void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
/**
* Request system compaction on the given store.
*/
void requestSystemCompaction(HRegion region, HStore store, String why,
boolean giveUpIfRequestedOrCompacting) throws IOException;
/**
* on/off compaction
*/

View File

@ -18,49 +18,49 @@
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
private final CompactionRequester compactionRequester = mock(CompactSplit.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
public static AtomicInteger called = new AtomicInteger(0);
@Override
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException {
@ -79,9 +79,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
}
@Test
public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
called.set(0);
public void shouldRequestCompactionAfterBulkLoad() throws IOException {
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
@ -90,7 +88,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
@ -105,77 +103,14 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
}
});
HRegion region = testRegionWithFamilies(family1, family2, family3);
region.bulkLoadHFiles(familyPaths, false, null);
assertEquals(3, called.get());
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}
@Test
public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
called.set(0);
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
}
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
walKey.setWriteEntry(we);
}
return 01L;
}
});
HRegion region = testRegionWithFamilies(family1, family2, family3);
region.bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 2 families
assertEquals(2, called.get());
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}
private class TestCompactSplit extends CompactSplit {
TestCompactSplit(Configuration conf) {
super(conf);
}
@Override
protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
called.addAndGet(1);
}
}
private class TestFamily1UnderCompact extends TestCompactSplit {
TestFamily1UnderCompact(Configuration conf) {
super(conf);
}
@Override
public boolean isUnderCompaction(final HStore s) {
if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
return true;
}
return super.isUnderCompaction(s);
}
}
}