diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 331f2d119e3..01ebb66fd72 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -995,6 +995,19 @@ public interface Admin extends Abortable, Closeable { } } + /** + * Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing + * compactions. This state is ephemeral. The setting will be lost on restart. Compaction + * can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled + * in hbase-site.xml. + * + * @param switchState Set to true to enable, false to disable. + * @param serverNamesList list of region servers. + * @return Previous compaction states for region servers + */ + Map compactionSwitch(boolean switchState, List serverNamesList) + throws IOException; + /** * Compact all regions on the region server. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction (it diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 8141e74de75..0e47de82ba1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1240,4 +1240,17 @@ public interface AsyncAdmin { */ CompletableFuture cloneTableSchema(final TableName tableName, final TableName newTableName, final boolean preserveSplits); + + /** + * Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing + * compactions. This state is ephemeral. The setting will be lost on restart. Compaction + * can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled + * in hbase-site.xml. + * + * @param switchState Set to true to enable, false to disable. + * @param serverNamesList list of region servers. + * @return Previous compaction states for region servers + */ + CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 5b22668c6b4..8b6c6293bca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -752,4 +752,10 @@ class AsyncHBaseAdmin implements AsyncAdmin { boolean preserveSplits) { return wrap(rawAdmin.cloneTableSchema(tableName, newTableName, preserveSplits)); } + + @Override + public CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList) { + return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7f3abea9a30..8bc26f17e5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -113,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; @@ -1262,6 +1264,51 @@ public class HBaseAdmin implements Admin { compactRegion(regionName, columnFamily, false); } + @Override + public Map compactionSwitch(boolean switchState, List + serverNamesList) throws IOException { + List serverList = new ArrayList<>(); + if (serverNamesList.isEmpty()) { + ClusterMetrics status = getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); + serverList.addAll(status.getLiveServerMetrics().keySet()); + } else { + for (String regionServerName: serverNamesList) { + ServerName serverName = null; + try { + serverName = ServerName.valueOf(regionServerName); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Invalid ServerName format: %s", + regionServerName)); + } + if (serverName == null) { + throw new IllegalArgumentException(String.format("Null ServerName: %s", + regionServerName)); + } + serverList.add(serverName); + } + } + Map res = new HashMap<>(serverList.size()); + for (ServerName serverName: serverList) { + boolean prev_state = switchCompact(this.connection.getAdmin(serverName), switchState); + res.put(serverName, prev_state); + } + return res; + } + + private Boolean switchCompact(AdminService.BlockingInterface admin, boolean onOrOff) + throws IOException { + return executeCallable(new RpcRetryingCallable() { + @Override protected Boolean rpcCall(int callTimeout) throws Exception { + HBaseRpcController controller = rpcControllerFactory.newController(); + CompactionSwitchRequest request = + CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(); + CompactionSwitchResponse compactionSwitchResponse = + admin.compactionSwitch(controller, request); + return compactionSwitchResponse.getPrevState(); + } + }); + } + @Override public void compactRegionServer(final ServerName serverName) throws IOException { for (RegionInfo region : getRegions(serverName)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 0fd0e59ab90..1edfb35df6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -104,6 +105,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -2986,6 +2989,85 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList) { + CompletableFuture> future = new CompletableFuture<>(); + getRegionServerList(serverNamesList).whenComplete((serverNames, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + //Accessed by multiple threads. + Map serverStates = new ConcurrentHashMap<>(serverNames.size()); + List> futures = new ArrayList<>(serverNames.size()); + serverNames.stream().forEach(serverName -> { + futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + serverStates.put(serverName, serverState); + } + })); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((ret, err3) -> { + if (!future.isCompletedExceptionally()) { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(serverStates); + } + } + }); + }); + return future; + } + + private CompletableFuture> getRegionServerList(List serverNamesList) { + CompletableFuture> future = new CompletableFuture<>(); + if (serverNamesList.isEmpty()) { + CompletableFuture clusterMetricsCompletableFuture = + getClusterMetrics(EnumSet.of(Option + .LIVE_SERVERS)); + clusterMetricsCompletableFuture.whenComplete((clusterMetrics, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet())); + } + }); + return future; + } else { + List serverList = new ArrayList<>(); + for (String regionServerName: serverNamesList) { + ServerName serverName = null; + try { + serverName = ServerName.valueOf(regionServerName); + } catch (Exception e) { + future.completeExceptionally(new IllegalArgumentException( + String.format("ServerName format: %s", regionServerName))); + } + if (serverName == null) { + future.completeExceptionally(new IllegalArgumentException( + String.format("Null ServerName: %s", regionServerName))); + } + } + future.complete(serverList); + } + return future; + } + + private CompletableFuture switchCompact(ServerName serverName, boolean onOrOff) { + return this + .newAdminCaller() + .serverName(serverName) + .action((controller, stub) -> this.adminCall(controller, stub, + CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) -> + s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call(); + } + @Override public CompletableFuture balancerSwitch(final boolean on) { return this diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d7e4476be32..62c82b9b3ae 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -728,6 +728,13 @@ possible configurations would overwhelm and obscure the important. StoreFiles into a single StoreFile. Larger values delay compaction, but when compaction does occur, it takes longer to complete. + + hbase.regionserver.compaction.enabled + true + Enable/disable compactions on by setting true/false. + We can further switch compactions dynamically with the + compaction_switch shell command. + hbase.hstore.flusher.count 2 diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index ddcc266ea22..c622d589c6f 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -157,6 +157,14 @@ message CompactRegionRequest { message CompactRegionResponse { } +message CompactionSwitchRequest { + required bool enabled = 1; +} + +message CompactionSwitchResponse { + required bool prev_state = 1; +} + message UpdateFavoredNodesRequest { repeated RegionUpdateInfo update_info = 1; @@ -293,6 +301,9 @@ service AdminService { rpc FlushRegion(FlushRegionRequest) returns(FlushRegionResponse); + rpc CompactionSwitch(CompactionSwitchRequest) + returns(CompactionSwitchResponse); + rpc CompactRegion(CompactRegionRequest) returns(CompactRegionResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 98df9b167ec..fbf73f36ee8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -82,16 +82,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit"; public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; + public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = + "hbase.regionserver.compaction.enabled"; private final HRegionServer server; private final Configuration conf; - - private final ThreadPoolExecutor longCompactions; - private final ThreadPoolExecutor shortCompactions; - private final ThreadPoolExecutor splits; + private volatile ThreadPoolExecutor longCompactions; + private volatile ThreadPoolExecutor shortCompactions; + private volatile ThreadPoolExecutor splits; private volatile ThroughputController compactionThroughputController; + private volatile boolean compactionsEnabled; /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -103,15 +105,35 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati CompactSplit(HRegionServer server) { this.server = server; this.conf = server.getConfiguration(); - this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, - DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); + createCompactionExecutors(); + createSplitExcecutors(); - int largeThreads = Math.max(1, conf.getInt( - LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); - int smallThreads = conf.getInt( - SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); + // compaction throughput controller + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, conf); + } + private void createSplitExcecutors() { + final String n = Thread.currentThread().getName(); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); + this.splits = + (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-splits-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + } + + private void createCompactionExecutors() { + this.regionSplitLimit = + conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + + int largeThreads = + Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); + int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); // if we have throttle threads, make sure the user also specified size Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); @@ -119,41 +141,27 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati final String n = Thread.currentThread().getName(); StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, stealJobQueue, + this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, + TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @Override public Thread newThread(Runnable r) { String name = n + "-longCompactions-" + System.currentTimeMillis(); return new Thread(r, name); } - }); + }); this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), + this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, + TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { String name = n + "-shortCompactions-" + System.currentTimeMillis(); return new Thread(r, name); } - }); - this.shortCompactions - .setRejectedExecutionHandler(new Rejection()); - this.splits = (ThreadPoolExecutor) - Executors.newFixedThreadPool(splitThreads, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - - // compaction throughput controller - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, conf); + }); + this.shortCompactions.setRejectedExecutionHandler(new Rejection()); } @Override @@ -236,6 +244,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } + private void interrupt() { + longCompactions.shutdownNow(); + shortCompactions.shutdownNow(); + } + + private void reInitializeCompactionsExecutors() { + createCompactionExecutors(); + } + private interface CompactionCompleteTracker { default void completed(Store store) { @@ -290,6 +307,21 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati getCompleteTracker(tracker, () -> 1), user); } + @Override + public void switchCompaction(boolean onOrOff) { + if (onOrOff) { + // re-create executor pool if compactions are disabled. + if (!isCompactionsEnabled()) { + LOG.info("Re-Initializing compactions because user switched on compactions"); + reInitializeCompactionsExecutors(); + } + } else { + LOG.info("Interrupting running compactions because user switched off compactions"); + interrupt(); + } + setCompactionsEnabled(onOrOff); + } + private void requestCompactionInternal(HRegion region, String why, int priority, boolean selectNow, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { @@ -366,6 +398,11 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private Optional selectCompaction(HRegion region, HStore store, int priority, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { + // don't even select for compaction if disableCompactions is set to true + if (!isCompactionsEnabled()) { + LOG.info(String.format("User has disabled compactions")); + return Optional.empty(); + } Optional compaction = store.requestCompaction(priority, tracker, user); if (!compaction.isPresent() && region.getRegionInfo() != null) { String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + @@ -777,4 +814,28 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati public void clearShortCompactionsQueue() { shortCompactions.getQueue().clear(); } + + public boolean isCompactionsEnabled() { + return compactionsEnabled; + } + + public void setCompactionsEnabled(boolean compactionsEnabled) { + this.compactionsEnabled = compactionsEnabled; + this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); + } + + /** + * @return the longCompactions thread pool executor + */ + ThreadPoolExecutor getLongCompactions() { + return longCompactions; + } + + /** + * @return the shortCompactions thread pool executor + */ + ThreadPoolExecutor getShortCompactions() { + return shortCompactions; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index baaa145bee1..6c1f7ff9d2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8218,7 +8218,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION - || op == Operation.COMPACT_REGION) { + || op == Operation.COMPACT_REGION || op == Operation.COMPACT_SWITCH) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; @@ -8539,7 +8539,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stores.values().forEach(HStore::triggerMajorCompaction); } rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker, - RpcServer.getRequestUser().orElse(null)); + RpcServer.getRequestUser().orElse(null)); } @Override @@ -8554,7 +8554,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi store.triggerMajorCompaction(); } rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, - RpcServer.getRequestUser().orElse(null)); + RpcServer.getRequestUser().orElse(null)); } private void requestFlushIfNeeded() throws RegionTooBusyException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 66a1b71278c..8e8930440f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -163,6 +163,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -1658,6 +1660,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + @Override + public CompactionSwitchResponse compactionSwitch(RpcController controller, + CompactionSwitchRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); + CompactionSwitchResponse response = + CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); + if (prevState == request.getEnabled()) { + // passed in requested state is same as current state. No action required + return response; + } + regionServer.compactSplitThread.switchCompaction(request.getEnabled()); + return response; + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + /** * Flush a region on the region server. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 80b18b8ae2a..ecc2158e7a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -193,7 +193,7 @@ public interface Region extends ConfigurationObserver { */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java index 76747224ba3..e5f536007e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java @@ -43,4 +43,10 @@ public interface CompactionRequester { */ void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; + + /** + * on/off compaction + */ + void switchCompaction(boolean onOrOff); + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 1a156ec09a8..a8ad4244620 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -26,6 +26,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -306,6 +308,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertEquals(3, countAfterMajorCompaction); } + @Test + public void testCompactionSwitchStates() throws Exception { + // Create a table with regions + byte[] family = Bytes.toBytes("family"); + byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")), + Bytes.add(family, Bytes.toBytes("3"))}; + createTableWithDefaultConf(tableName, null, families); + loadData(tableName, families, 3000, 8); + List regions = new ArrayList<>(); + TEST_UTIL + .getHBaseCluster() + .getLiveRegionServerThreads() + .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); + CompletableFuture> listCompletableFuture = + admin.compactionSwitch(true, new ArrayList<>()); + Map pairs = listCompletableFuture.get(); + for (Map.Entry p : pairs.entrySet()) { + assertEquals("Default compaction state, expected=enabled actual=disabled", + true, p.getValue()); + } + CompletableFuture> listCompletableFuture1 = + admin.compactionSwitch(false, new ArrayList<>()); + Map pairs1 = listCompletableFuture1.get(); + for (Map.Entry p : pairs1.entrySet()) { + assertEquals("Last compaction state, expected=enabled actual=disabled", + true, p.getValue()); + } + CompletableFuture> listCompletableFuture2 = + admin.compactionSwitch(true, new ArrayList<>()); + Map pairs2 = listCompletableFuture2.get(); + for (Map.Entry p : pairs2.entrySet()) { + assertEquals("Last compaction state, expected=disabled actual=enabled", + false, p.getValue()); + } + } + @Test public void testCompact() throws Exception { compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index d366b67fe58..f0d3b19fe34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -481,6 +483,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { return null; } + @Override + public CompactionSwitchResponse compactionSwitch(RpcController controller, + CompactionSwitchRequest request) throws ServiceException { + return null; + } + @Override public CompactRegionResponse compactRegion(RpcController controller, CompactRegionRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 6fe76d87072..a1d76fba3f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -360,6 +360,75 @@ public class TestCompaction { postFailedCount > preFailedCount); } + /** + * Test no new Compaction requests are generated after calling stop compactions + */ + @Test public void testStopStartCompaction() throws IOException { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + // setup a region/store with some files + HStore store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { + createStoreFile(r); + } + thread.switchCompaction(false); + thread + .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, + null); + assertEquals(false, thread.isCompactionsEnabled()); + assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() + .getActiveCount()); + thread.switchCompaction(true); + assertEquals(true, thread.isCompactionsEnabled()); + thread + .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, + null); + assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() + .getActiveCount()); + } + + @Test public void testInterruptingRunningCompactions() throws Exception { + // setup a compact/split thread on a mock server + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + WaitThroughPutController.class.getName()); + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // setup a region/store with some files + HStore store = r.getStore(COLUMN_FAMILY); + int jmax = (int) Math.ceil(15.0 / compactionThreshold); + byte[] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Table loader = new RegionAsTable(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + r.flush(true); + } + HStore s = r.getStore(COLUMN_FAMILY); + int initialFiles = s.getStorefilesCount(); + + thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, null); + + Thread.sleep(3000); + thread.switchCompaction(false); + assertEquals(initialFiles, s.getStorefilesCount()); + //don't mess up future tests + thread.switchCompaction(true); + } + /** * HBASE-7947: Regression test to ensure adding to the correct list in the * {@link CompactSplit} @@ -712,4 +781,20 @@ public class TestCompaction { done.countDown(); } } + + /** + * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction + * finishes. + */ + public static class WaitThroughPutController extends NoLimitThroughputController{ + + public WaitThroughPutController() { + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + Thread.sleep(6000000); + return 6000000; + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index c385d3dabf6..75d2de3543d 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -85,6 +85,19 @@ module Hbase end end + #---------------------------------------------------------------------------------------------- + # Switch compaction on/off at runtime on a region server + def compaction_switch(on_or_off, regionserver_names) + region_servers = regionserver_names.flatten.compact + servers = java.util.ArrayList.new + if region_servers.any? + region_servers.each do |s| + servers.add(s) + end + end + @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers) + end + #---------------------------------------------------------------------------------------------- # Gets compaction state for specified table def getCompactionState(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 9a796587b6b..2c63ed69f9b 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -334,6 +334,7 @@ Shell.load_command_group( is_in_maintenance_mode close_region compact + compaction_switch flush major_compact move diff --git a/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb b/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb new file mode 100644 index 00000000000..94db700bc87 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb @@ -0,0 +1,52 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + # Switch compaction for a region server + class CompactionSwitch < Command + def help + <<-EOF + Turn the compaction on or off on regionservers. Disabling compactions will also interrupt + any currently ongoing compactions. This state is ephemeral. The setting will be lost on + restart of the server. Compaction can also be enabled/disabled by modifying configuration + hbase.regionserver.compaction.enabled in hbase-site.xml. + Examples: + To enable compactions on all region servers + hbase> compaction_switch true + To disable compactions on all region servers + hbase> compaction_switch false + To enable compactions on specific region servers + hbase> compaction_switch true 'server2','server1' + To disable compactions on specific region servers + hbase> compaction_switch false 'server2','server1' + NOTE: A server name is its host, port plus startcode. For example: + host187.example.com,60020,1289493121758 + EOF + end + + def command(enable_disable, *server) + formatter.header(%w(['SERVER' 'PREV_STATE'])) + prev_state = admin.compaction_switch(enable_disable, server) + prev_state.each { |k, v| formatter.row([k.getServerName, java.lang.String.valueOf(v)]) } + formatter.footer(prev_state.size) + end + end + end +end diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc index 19a700a1207..453cf62ae4a 100644 --- a/src/main/asciidoc/_chapters/architecture.adoc +++ b/src/main/asciidoc/_chapters/architecture.adoc @@ -1845,6 +1845,14 @@ See <>. Compactions do not perform region merges. See <> for more information on region merging. +.Compaction Switch +We can switch on and off the compactions at region servers. Switching off compactions will also +interrupt any currently ongoing compactions. It can be done dynamically using the "compaction_switch" +command from hbase shell. If done from the command line, this setting will be lost on restart of the +server. To persist the changes across region servers modify the configuration hbase.regionserver +.compaction.enabled in hbase-site.xml and restart HBase. + + [[compaction.file.selection]] ===== Compaction Policy - HBase 0.96.x and newer