HBASE-6028 Start/Stop compactions at region server level

Add switching on/off of compactions.

Switching off compactions will also interrupt any currently ongoing compactions.
Adds a "compaction_switch" to hbase shell. Switching off compactions will
interrupt any currently ongoing compactions. State set from shell will be
lost on restart. To persist the changes across region servers modify
hbase.regionserver.compaction.enabled in hbase-site.xml and restart.

Signed-off-by: Umesh Agashe <uagashe@cloudera.com>
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Mohit Goel 2018-06-07 15:11:28 -07:00 committed by Michael Stack
parent 4259da722e
commit 35a3c605f2
19 changed files with 508 additions and 35 deletions

View File

@ -995,6 +995,19 @@ public interface Admin extends Abortable, Closeable {
}
}
/**
* Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing
* compactions. This state is ephemeral. The setting will be lost on restart. Compaction
* can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled
* in hbase-site.xml.
*
* @param switchState Set to <code>true</code> to enable, <code>false</code> to disable.
* @param serverNamesList list of region servers.
* @return Previous compaction states for region servers
*/
Map<ServerName, Boolean> compactionSwitch(boolean switchState, List<String> serverNamesList)
throws IOException;
/**
* Compact all regions on the region server. Asynchronous operation in that this method requests
* that a Compaction run and then it returns. It does not wait on the completion of Compaction (it

View File

@ -1240,4 +1240,17 @@ public interface AsyncAdmin {
*/
CompletableFuture<Void> cloneTableSchema(final TableName tableName,
final TableName newTableName, final boolean preserveSplits);
/**
* Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing
* compactions. This state is ephemeral. The setting will be lost on restart. Compaction
* can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled
* in hbase-site.xml.
*
* @param switchState Set to <code>true</code> to enable, <code>false</code> to disable.
* @param serverNamesList list of region servers.
* @return Previous compaction states for region servers
*/
CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
List<String> serverNamesList);
}

View File

@ -752,4 +752,10 @@ class AsyncHBaseAdmin implements AsyncAdmin {
boolean preserveSplits) {
return wrap(rawAdmin.cloneTableSchema(tableName, newTableName, preserveSplits));
}
@Override
public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
List<String> serverNamesList) {
return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList));
}
}

View File

@ -113,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
@ -1262,6 +1264,51 @@ public class HBaseAdmin implements Admin {
compactRegion(regionName, columnFamily, false);
}
@Override
public Map<ServerName, Boolean> compactionSwitch(boolean switchState, List<String>
serverNamesList) throws IOException {
List<ServerName> serverList = new ArrayList<>();
if (serverNamesList.isEmpty()) {
ClusterMetrics status = getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
serverList.addAll(status.getLiveServerMetrics().keySet());
} else {
for (String regionServerName: serverNamesList) {
ServerName serverName = null;
try {
serverName = ServerName.valueOf(regionServerName);
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Invalid ServerName format: %s",
regionServerName));
}
if (serverName == null) {
throw new IllegalArgumentException(String.format("Null ServerName: %s",
regionServerName));
}
serverList.add(serverName);
}
}
Map<ServerName, Boolean> res = new HashMap<>(serverList.size());
for (ServerName serverName: serverList) {
boolean prev_state = switchCompact(this.connection.getAdmin(serverName), switchState);
res.put(serverName, prev_state);
}
return res;
}
private Boolean switchCompact(AdminService.BlockingInterface admin, boolean onOrOff)
throws IOException {
return executeCallable(new RpcRetryingCallable<Boolean>() {
@Override protected Boolean rpcCall(int callTimeout) throws Exception {
HBaseRpcController controller = rpcControllerFactory.newController();
CompactionSwitchRequest request =
CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build();
CompactionSwitchResponse compactionSwitchResponse =
admin.compactionSwitch(controller, request);
return compactionSwitchResponse.getPrevState();
}
});
}
@Override
public void compactRegionServer(final ServerName serverName) throws IOException {
for (RegionInfo region : getRegions(serverName)) {

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -104,6 +105,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@ -2986,6 +2989,85 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
return future;
}
@Override
public CompletableFuture<Map<ServerName, Boolean>> compactionSwitch(boolean switchState,
List<String> serverNamesList) {
CompletableFuture<Map<ServerName, Boolean>> future = new CompletableFuture<>();
getRegionServerList(serverNamesList).whenComplete((serverNames, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
//Accessed by multiple threads.
Map<ServerName, Boolean> serverStates = new ConcurrentHashMap<>(serverNames.size());
List<CompletableFuture<Boolean>> futures = new ArrayList<>(serverNames.size());
serverNames.stream().forEach(serverName -> {
futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
serverStates.put(serverName, serverState);
}
}));
});
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
.whenComplete((ret, err3) -> {
if (!future.isCompletedExceptionally()) {
if (err3 != null) {
future.completeExceptionally(err3);
} else {
future.complete(serverStates);
}
}
});
});
return future;
}
private CompletableFuture<List<ServerName>> getRegionServerList(List<String> serverNamesList) {
CompletableFuture<List<ServerName>> future = new CompletableFuture<>();
if (serverNamesList.isEmpty()) {
CompletableFuture<ClusterMetrics> clusterMetricsCompletableFuture =
getClusterMetrics(EnumSet.of(Option
.LIVE_SERVERS));
clusterMetricsCompletableFuture.whenComplete((clusterMetrics, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else {
future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet()));
}
});
return future;
} else {
List<ServerName> serverList = new ArrayList<>();
for (String regionServerName: serverNamesList) {
ServerName serverName = null;
try {
serverName = ServerName.valueOf(regionServerName);
} catch (Exception e) {
future.completeExceptionally(new IllegalArgumentException(
String.format("ServerName format: %s", regionServerName)));
}
if (serverName == null) {
future.completeExceptionally(new IllegalArgumentException(
String.format("Null ServerName: %s", regionServerName)));
}
}
future.complete(serverList);
}
return future;
}
private CompletableFuture<Boolean> switchCompact(ServerName serverName, boolean onOrOff) {
return this
.<Boolean>newAdminCaller()
.serverName(serverName)
.action((controller, stub) -> this.<CompactionSwitchRequest, CompactionSwitchResponse,
Boolean>adminCall(controller, stub,
CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) ->
s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call();
}
@Override
public CompletableFuture<Boolean> balancerSwitch(final boolean on) {
return this

View File

@ -728,6 +728,13 @@ possible configurations would overwhelm and obscure the important.
StoreFiles into a single StoreFile. Larger values delay compaction, but when compaction does
occur, it takes longer to complete.</description>
</property>
<property>
<name>hbase.regionserver.compaction.enabled</name>
<value>true</value>
<description>Enable/disable compactions on by setting true/false.
We can further switch compactions dynamically with the
compaction_switch shell command.</description>
</property>
<property>
<name>hbase.hstore.flusher.count</name>
<value>2</value>

View File

@ -157,6 +157,14 @@ message CompactRegionRequest {
message CompactRegionResponse {
}
message CompactionSwitchRequest {
required bool enabled = 1;
}
message CompactionSwitchResponse {
required bool prev_state = 1;
}
message UpdateFavoredNodesRequest {
repeated RegionUpdateInfo update_info = 1;
@ -293,6 +301,9 @@ service AdminService {
rpc FlushRegion(FlushRegionRequest)
returns(FlushRegionResponse);
rpc CompactionSwitch(CompactionSwitchRequest)
returns(CompactionSwitchResponse);
rpc CompactRegion(CompactRegionRequest)
returns(CompactRegionResponse);

View File

@ -82,16 +82,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
"hbase.regionserver.regionSplitLimit";
public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
"hbase.regionserver.compaction.enabled";
private final HRegionServer server;
private final Configuration conf;
private final ThreadPoolExecutor longCompactions;
private final ThreadPoolExecutor shortCompactions;
private final ThreadPoolExecutor splits;
private volatile ThreadPoolExecutor longCompactions;
private volatile ThreadPoolExecutor shortCompactions;
private volatile ThreadPoolExecutor splits;
private volatile ThroughputController compactionThroughputController;
private volatile boolean compactionsEnabled;
/**
* Splitting should not take place if the total number of regions exceed this.
* This is not a hard limit to the number of regions but it is a guideline to
@ -103,15 +105,35 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
CompactSplit(HRegionServer server) {
this.server = server;
this.conf = server.getConfiguration();
this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
createCompactionExecutors();
createSplitExcecutors();
int largeThreads = Math.max(1, conf.getInt(
LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
int smallThreads = conf.getInt(
SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
// compaction throughput controller
this.compactionThroughputController =
CompactionThroughputControllerFactory.create(server, conf);
}
private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
this.splits =
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-splits-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
}
private void createCompactionExecutors() {
this.regionSplitLimit =
conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
int largeThreads =
Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
// if we have throttle threads, make sure the user also specified size
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
@ -119,8 +141,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
60, TimeUnit.SECONDS, stealJobQueue,
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
TimeUnit.SECONDS, stealJobQueue,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -130,8 +152,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
});
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -139,21 +161,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return new Thread(r, name);
}
});
this.shortCompactions
.setRejectedExecutionHandler(new Rejection());
this.splits = (ThreadPoolExecutor)
Executors.newFixedThreadPool(splitThreads,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = n + "-splits-" + System.currentTimeMillis();
return new Thread(r, name);
}
});
// compaction throughput controller
this.compactionThroughputController =
CompactionThroughputControllerFactory.create(server, conf);
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}
@Override
@ -236,6 +244,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
}
private void interrupt() {
longCompactions.shutdownNow();
shortCompactions.shutdownNow();
}
private void reInitializeCompactionsExecutors() {
createCompactionExecutors();
}
private interface CompactionCompleteTracker {
default void completed(Store store) {
@ -290,6 +307,21 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
getCompleteTracker(tracker, () -> 1), user);
}
@Override
public void switchCompaction(boolean onOrOff) {
if (onOrOff) {
// re-create executor pool if compactions are disabled.
if (!isCompactionsEnabled()) {
LOG.info("Re-Initializing compactions because user switched on compactions");
reInitializeCompactionsExecutors();
}
} else {
LOG.info("Interrupting running compactions because user switched off compactions");
interrupt();
}
setCompactionsEnabled(onOrOff);
}
private void requestCompactionInternal(HRegion region, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
@ -366,6 +398,11 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
throws IOException {
// don't even select for compaction if disableCompactions is set to true
if (!isCompactionsEnabled()) {
LOG.info(String.format("User has disabled compactions"));
return Optional.empty();
}
Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
if (!compaction.isPresent() && region.getRegionInfo() != null) {
String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
@ -777,4 +814,28 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
public void clearShortCompactionsQueue() {
shortCompactions.getQueue().clear();
}
public boolean isCompactionsEnabled() {
return compactionsEnabled;
}
public void setCompactionsEnabled(boolean compactionsEnabled) {
this.compactionsEnabled = compactionsEnabled;
this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
}
/**
* @return the longCompactions thread pool executor
*/
ThreadPoolExecutor getLongCompactions() {
return longCompactions;
}
/**
* @return the shortCompactions thread pool executor
*/
ThreadPoolExecutor getShortCompactions() {
return shortCompactions;
}
}

View File

@ -8218,7 +8218,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
break;
}
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
|| op == Operation.COMPACT_REGION) {
|| op == Operation.COMPACT_REGION || op == Operation.COMPACT_SWITCH) {
// split, merge or compact region doesn't need to check the closing/closed state or lock the
// region
return;

View File

@ -163,6 +163,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -1658,6 +1660,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
@Override
public CompactionSwitchResponse compactionSwitch(RpcController controller,
CompactionSwitchRequest request) throws ServiceException {
try {
checkOpen();
requestCount.increment();
boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
CompactionSwitchResponse response =
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
if (prevState == request.getEnabled()) {
// passed in requested state is same as current state. No action required
return response;
}
regionServer.compactSplitThread.switchCompaction(request.getEnabled());
return response;
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Flush a region on the region server.
*

View File

@ -193,7 +193,7 @@ public interface Region extends ConfigurationObserver {
*/
enum Operation {
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH
}
/**

View File

@ -43,4 +43,10 @@ public interface CompactionRequester {
*/
void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
/**
* on/off compaction
*/
void switchCompaction(boolean onOrOff);
}

View File

@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -306,6 +308,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
assertEquals(3, countAfterMajorCompaction);
}
@Test
public void testCompactionSwitchStates() throws Exception {
// Create a table with regions
byte[] family = Bytes.toBytes("family");
byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")),
Bytes.add(family, Bytes.toBytes("3"))};
createTableWithDefaultConf(tableName, null, families);
loadData(tableName, families, 3000, 8);
List<Region> regions = new ArrayList<>();
TEST_UTIL
.getHBaseCluster()
.getLiveRegionServerThreads()
.forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
admin.compactionSwitch(true, new ArrayList<>());
Map<ServerName, Boolean> pairs = listCompletableFuture.get();
for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
assertEquals("Default compaction state, expected=enabled actual=disabled",
true, p.getValue());
}
CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
admin.compactionSwitch(false, new ArrayList<>());
Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
assertEquals("Last compaction state, expected=enabled actual=disabled",
true, p.getValue());
}
CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
admin.compactionSwitch(true, new ArrayList<>());
Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
assertEquals("Last compaction state, expected=disabled actual=enabled",
false, p.getValue());
}
}
@Test
public void testCompact() throws Exception {
compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);

View File

@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
@ -481,6 +483,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public CompactionSwitchResponse compactionSwitch(RpcController controller,
CompactionSwitchRequest request) throws ServiceException {
return null;
}
@Override
public CompactRegionResponse compactRegion(RpcController controller,
CompactRegionRequest request) throws ServiceException {

View File

@ -360,6 +360,75 @@ public class TestCompaction {
postFailedCount > preFailedCount);
}
/**
* Test no new Compaction requests are generated after calling stop compactions
*/
@Test public void testStopStartCompaction() throws IOException {
// setup a compact/split thread on a mock server
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
CompactSplit thread = new CompactSplit(mockServer);
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
// setup a region/store with some files
HStore store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
createStoreFile(r);
}
thread.switchCompaction(false);
thread
.requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
null);
assertEquals(false, thread.isCompactionsEnabled());
assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
.getActiveCount());
thread.switchCompaction(true);
assertEquals(true, thread.isCompactionsEnabled());
thread
.requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
null);
assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
.getActiveCount());
}
@Test public void testInterruptingRunningCompactions() throws Exception {
// setup a compact/split thread on a mock server
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
WaitThroughPutController.class.getName());
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
CompactSplit thread = new CompactSplit(mockServer);
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
// setup a region/store with some files
HStore store = r.getStore(COLUMN_FAMILY);
int jmax = (int) Math.ceil(15.0 / compactionThreshold);
byte[] pad = new byte[1000]; // 1 KB chunk
for (int i = 0; i < compactionThreshold; i++) {
Table loader = new RegionAsTable(r);
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
p.setDurability(Durability.SKIP_WAL);
for (int j = 0; j < jmax; j++) {
p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
loader.put(p);
r.flush(true);
}
HStore s = r.getStore(COLUMN_FAMILY);
int initialFiles = s.getStorefilesCount();
thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
CompactionLifeCycleTracker.DUMMY, null);
Thread.sleep(3000);
thread.switchCompaction(false);
assertEquals(initialFiles, s.getStorefilesCount());
//don't mess up future tests
thread.switchCompaction(true);
}
/**
* HBASE-7947: Regression test to ensure adding to the correct list in the
* {@link CompactSplit}
@ -712,4 +781,20 @@ public class TestCompaction {
done.countDown();
}
}
/**
* Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
* finishes.
*/
public static class WaitThroughPutController extends NoLimitThroughputController{
public WaitThroughPutController() {
}
@Override
public long control(String compactionName, long size) throws InterruptedException {
Thread.sleep(6000000);
return 6000000;
}
}
}

View File

@ -85,6 +85,19 @@ module Hbase
end
end
#----------------------------------------------------------------------------------------------
# Switch compaction on/off at runtime on a region server
def compaction_switch(on_or_off, regionserver_names)
region_servers = regionserver_names.flatten.compact
servers = java.util.ArrayList.new
if region_servers.any?
region_servers.each do |s|
servers.add(s)
end
end
@admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
end
#----------------------------------------------------------------------------------------------
# Gets compaction state for specified table
def getCompactionState(table_name)

View File

@ -334,6 +334,7 @@ Shell.load_command_group(
is_in_maintenance_mode
close_region
compact
compaction_switch
flush
major_compact
move

View File

@ -0,0 +1,52 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
# Switch compaction for a region server
class CompactionSwitch < Command
def help
<<-EOF
Turn the compaction on or off on regionservers. Disabling compactions will also interrupt
any currently ongoing compactions. This state is ephemeral. The setting will be lost on
restart of the server. Compaction can also be enabled/disabled by modifying configuration
hbase.regionserver.compaction.enabled in hbase-site.xml.
Examples:
To enable compactions on all region servers
hbase> compaction_switch true
To disable compactions on all region servers
hbase> compaction_switch false
To enable compactions on specific region servers
hbase> compaction_switch true 'server2','server1'
To disable compactions on specific region servers
hbase> compaction_switch false 'server2','server1'
NOTE: A server name is its host, port plus startcode. For example:
host187.example.com,60020,1289493121758
EOF
end
def command(enable_disable, *server)
formatter.header(%w(['SERVER' 'PREV_STATE']))
prev_state = admin.compaction_switch(enable_disable, server)
prev_state.each { |k, v| formatter.row([k.getServerName, java.lang.String.valueOf(v)]) }
formatter.footer(prev_state.size)
end
end
end
end

View File

@ -1845,6 +1845,14 @@ See <<managed.compactions>>.
Compactions do not perform region merges.
See <<ops.regionmgt.merge>> for more information on region merging.
.Compaction Switch
We can switch on and off the compactions at region servers. Switching off compactions will also
interrupt any currently ongoing compactions. It can be done dynamically using the "compaction_switch"
command from hbase shell. If done from the command line, this setting will be lost on restart of the
server. To persist the changes across region servers modify the configuration hbase.regionserver
.compaction.enabled in hbase-site.xml and restart HBase.
[[compaction.file.selection]]
===== Compaction Policy - HBase 0.96.x and newer