From 35a3c605f2e9edb7e5e1b21d3f15ef351fca0432 Mon Sep 17 00:00:00 2001 From: Mohit Goel Date: Thu, 7 Jun 2018 15:11:28 -0700 Subject: [PATCH] HBASE-6028 Start/Stop compactions at region server level Add switching on/off of compactions. Switching off compactions will also interrupt any currently ongoing compactions. Adds a "compaction_switch" to hbase shell. Switching off compactions will interrupt any currently ongoing compactions. State set from shell will be lost on restart. To persist the changes across region servers modify hbase.regionserver.compaction.enabled in hbase-site.xml and restart. Signed-off-by: Umesh Agashe Signed-off-by: Michael Stack --- .../org/apache/hadoop/hbase/client/Admin.java | 13 ++ .../hadoop/hbase/client/AsyncAdmin.java | 13 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 6 + .../hadoop/hbase/client/HBaseAdmin.java | 47 +++++++ .../hbase/client/RawAsyncHBaseAdmin.java | 82 ++++++++++++ .../src/main/resources/hbase-default.xml | 7 + .../src/main/protobuf/Admin.proto | 11 ++ .../hbase/regionserver/CompactSplit.java | 123 +++++++++++++----- .../hadoop/hbase/regionserver/HRegion.java | 6 +- .../hbase/regionserver/RSRpcServices.java | 22 ++++ .../hadoop/hbase/regionserver/Region.java | 2 +- .../compactions/CompactionRequester.java | 6 + .../hbase/client/TestAsyncRegionAdminApi.java | 38 ++++++ .../hadoop/hbase/master/MockRegionServer.java | 8 ++ .../hbase/regionserver/TestCompaction.java | 85 ++++++++++++ hbase-shell/src/main/ruby/hbase/admin.rb | 13 ++ hbase-shell/src/main/ruby/shell.rb | 1 + .../ruby/shell/commands/compaction_switch.rb | 52 ++++++++ src/main/asciidoc/_chapters/architecture.adoc | 8 ++ 19 files changed, 508 insertions(+), 35 deletions(-) create mode 100644 hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 331f2d119e3..01ebb66fd72 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -995,6 +995,19 @@ public interface Admin extends Abortable, Closeable { } } + /** + * Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing + * compactions. This state is ephemeral. The setting will be lost on restart. Compaction + * can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled + * in hbase-site.xml. + * + * @param switchState Set to true to enable, false to disable. + * @param serverNamesList list of region servers. + * @return Previous compaction states for region servers + */ + Map compactionSwitch(boolean switchState, List serverNamesList) + throws IOException; + /** * Compact all regions on the region server. Asynchronous operation in that this method requests * that a Compaction run and then it returns. It does not wait on the completion of Compaction (it diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 8141e74de75..0e47de82ba1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1240,4 +1240,17 @@ public interface AsyncAdmin { */ CompletableFuture cloneTableSchema(final TableName tableName, final TableName newTableName, final boolean preserveSplits); + + /** + * Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing + * compactions. This state is ephemeral. The setting will be lost on restart. Compaction + * can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled + * in hbase-site.xml. + * + * @param switchState Set to true to enable, false to disable. + * @param serverNamesList list of region servers. + * @return Previous compaction states for region servers + */ + CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 5b22668c6b4..8b6c6293bca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -752,4 +752,10 @@ class AsyncHBaseAdmin implements AsyncAdmin { boolean preserveSplits) { return wrap(rawAdmin.cloneTableSchema(tableName, newTableName, preserveSplits)); } + + @Override + public CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList) { + return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 7f3abea9a30..8bc26f17e5c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -113,6 +113,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; @@ -1262,6 +1264,51 @@ public class HBaseAdmin implements Admin { compactRegion(regionName, columnFamily, false); } + @Override + public Map compactionSwitch(boolean switchState, List + serverNamesList) throws IOException { + List serverList = new ArrayList<>(); + if (serverNamesList.isEmpty()) { + ClusterMetrics status = getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)); + serverList.addAll(status.getLiveServerMetrics().keySet()); + } else { + for (String regionServerName: serverNamesList) { + ServerName serverName = null; + try { + serverName = ServerName.valueOf(regionServerName); + } catch (Exception e) { + throw new IllegalArgumentException(String.format("Invalid ServerName format: %s", + regionServerName)); + } + if (serverName == null) { + throw new IllegalArgumentException(String.format("Null ServerName: %s", + regionServerName)); + } + serverList.add(serverName); + } + } + Map res = new HashMap<>(serverList.size()); + for (ServerName serverName: serverList) { + boolean prev_state = switchCompact(this.connection.getAdmin(serverName), switchState); + res.put(serverName, prev_state); + } + return res; + } + + private Boolean switchCompact(AdminService.BlockingInterface admin, boolean onOrOff) + throws IOException { + return executeCallable(new RpcRetryingCallable() { + @Override protected Boolean rpcCall(int callTimeout) throws Exception { + HBaseRpcController controller = rpcControllerFactory.newController(); + CompactionSwitchRequest request = + CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(); + CompactionSwitchResponse compactionSwitchResponse = + admin.compactionSwitch(controller, request); + return compactionSwitchResponse.getPrevState(); + } + }); + } + @Override public void compactRegionServer(final ServerName serverName) throws IOException { for (RegionInfo region : getRegions(serverName)) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 0fd0e59ab90..1edfb35df6b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -104,6 +105,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; @@ -2986,6 +2989,85 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } + @Override + public CompletableFuture> compactionSwitch(boolean switchState, + List serverNamesList) { + CompletableFuture> future = new CompletableFuture<>(); + getRegionServerList(serverNamesList).whenComplete((serverNames, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + //Accessed by multiple threads. + Map serverStates = new ConcurrentHashMap<>(serverNames.size()); + List> futures = new ArrayList<>(serverNames.size()); + serverNames.stream().forEach(serverName -> { + futures.add(switchCompact(serverName, switchState).whenComplete((serverState, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + serverStates.put(serverName, serverState); + } + })); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((ret, err3) -> { + if (!future.isCompletedExceptionally()) { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(serverStates); + } + } + }); + }); + return future; + } + + private CompletableFuture> getRegionServerList(List serverNamesList) { + CompletableFuture> future = new CompletableFuture<>(); + if (serverNamesList.isEmpty()) { + CompletableFuture clusterMetricsCompletableFuture = + getClusterMetrics(EnumSet.of(Option + .LIVE_SERVERS)); + clusterMetricsCompletableFuture.whenComplete((clusterMetrics, err) -> { + if (err != null) { + future.completeExceptionally(err); + } else { + future.complete(new ArrayList<>(clusterMetrics.getLiveServerMetrics().keySet())); + } + }); + return future; + } else { + List serverList = new ArrayList<>(); + for (String regionServerName: serverNamesList) { + ServerName serverName = null; + try { + serverName = ServerName.valueOf(regionServerName); + } catch (Exception e) { + future.completeExceptionally(new IllegalArgumentException( + String.format("ServerName format: %s", regionServerName))); + } + if (serverName == null) { + future.completeExceptionally(new IllegalArgumentException( + String.format("Null ServerName: %s", regionServerName))); + } + } + future.complete(serverList); + } + return future; + } + + private CompletableFuture switchCompact(ServerName serverName, boolean onOrOff) { + return this + .newAdminCaller() + .serverName(serverName) + .action((controller, stub) -> this.adminCall(controller, stub, + CompactionSwitchRequest.newBuilder().setEnabled(onOrOff).build(), (s, c, req, done) -> + s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call(); + } + @Override public CompletableFuture balancerSwitch(final boolean on) { return this diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index d7e4476be32..62c82b9b3ae 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -728,6 +728,13 @@ possible configurations would overwhelm and obscure the important. StoreFiles into a single StoreFile. Larger values delay compaction, but when compaction does occur, it takes longer to complete. + + hbase.regionserver.compaction.enabled + true + Enable/disable compactions on by setting true/false. + We can further switch compactions dynamically with the + compaction_switch shell command. + hbase.hstore.flusher.count 2 diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index ddcc266ea22..c622d589c6f 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -157,6 +157,14 @@ message CompactRegionRequest { message CompactRegionResponse { } +message CompactionSwitchRequest { + required bool enabled = 1; +} + +message CompactionSwitchResponse { + required bool prev_state = 1; +} + message UpdateFavoredNodesRequest { repeated RegionUpdateInfo update_info = 1; @@ -293,6 +301,9 @@ service AdminService { rpc FlushRegion(FlushRegionRequest) returns(FlushRegionResponse); + rpc CompactionSwitch(CompactionSwitchRequest) + returns(CompactionSwitchResponse); + rpc CompactRegion(CompactRegionRequest) returns(CompactRegionResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 98df9b167ec..fbf73f36ee8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -82,16 +82,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati public static final String REGION_SERVER_REGION_SPLIT_LIMIT = "hbase.regionserver.regionSplitLimit"; public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000; + public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = + "hbase.regionserver.compaction.enabled"; private final HRegionServer server; private final Configuration conf; - - private final ThreadPoolExecutor longCompactions; - private final ThreadPoolExecutor shortCompactions; - private final ThreadPoolExecutor splits; + private volatile ThreadPoolExecutor longCompactions; + private volatile ThreadPoolExecutor shortCompactions; + private volatile ThreadPoolExecutor splits; private volatile ThroughputController compactionThroughputController; + private volatile boolean compactionsEnabled; /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -103,15 +105,35 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati CompactSplit(HRegionServer server) { this.server = server; this.conf = server.getConfiguration(); - this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, - DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true); + createCompactionExecutors(); + createSplitExcecutors(); - int largeThreads = Math.max(1, conf.getInt( - LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); - int smallThreads = conf.getInt( - SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); + // compaction throughput controller + this.compactionThroughputController = + CompactionThroughputControllerFactory.create(server, conf); + } + private void createSplitExcecutors() { + final String n = Thread.currentThread().getName(); int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT); + this.splits = + (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + String name = n + "-splits-" + System.currentTimeMillis(); + return new Thread(r, name); + } + }); + } + + private void createCompactionExecutors() { + this.regionSplitLimit = + conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT); + + int largeThreads = + Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT)); + int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT); // if we have throttle threads, make sure the user also specified size Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0); @@ -119,41 +141,27 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati final String n = Thread.currentThread().getName(); StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, - 60, TimeUnit.SECONDS, stealJobQueue, + this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, + TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @Override public Thread newThread(Runnable r) { String name = n + "-longCompactions-" + System.currentTimeMillis(); return new Thread(r, name); } - }); + }); this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, - 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), + this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, + TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { String name = n + "-shortCompactions-" + System.currentTimeMillis(); return new Thread(r, name); } - }); - this.shortCompactions - .setRejectedExecutionHandler(new Rejection()); - this.splits = (ThreadPoolExecutor) - Executors.newFixedThreadPool(splitThreads, - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = n + "-splits-" + System.currentTimeMillis(); - return new Thread(r, name); - } - }); - - // compaction throughput controller - this.compactionThroughputController = - CompactionThroughputControllerFactory.create(server, conf); + }); + this.shortCompactions.setRejectedExecutionHandler(new Rejection()); } @Override @@ -236,6 +244,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } } + private void interrupt() { + longCompactions.shutdownNow(); + shortCompactions.shutdownNow(); + } + + private void reInitializeCompactionsExecutors() { + createCompactionExecutors(); + } + private interface CompactionCompleteTracker { default void completed(Store store) { @@ -290,6 +307,21 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati getCompleteTracker(tracker, () -> 1), user); } + @Override + public void switchCompaction(boolean onOrOff) { + if (onOrOff) { + // re-create executor pool if compactions are disabled. + if (!isCompactionsEnabled()) { + LOG.info("Re-Initializing compactions because user switched on compactions"); + reInitializeCompactionsExecutors(); + } + } else { + LOG.info("Interrupting running compactions because user switched off compactions"); + interrupt(); + } + setCompactionsEnabled(onOrOff); + } + private void requestCompactionInternal(HRegion region, String why, int priority, boolean selectNow, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { @@ -366,6 +398,11 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati private Optional selectCompaction(HRegion region, HStore store, int priority, CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user) throws IOException { + // don't even select for compaction if disableCompactions is set to true + if (!isCompactionsEnabled()) { + LOG.info(String.format("User has disabled compactions")); + return Optional.empty(); + } Optional compaction = store.requestCompaction(priority, tracker, user); if (!compaction.isPresent() && region.getRegionInfo() != null) { String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + @@ -777,4 +814,28 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati public void clearShortCompactionsQueue() { shortCompactions.getQueue().clear(); } + + public boolean isCompactionsEnabled() { + return compactionsEnabled; + } + + public void setCompactionsEnabled(boolean compactionsEnabled) { + this.compactionsEnabled = compactionsEnabled; + this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); + } + + /** + * @return the longCompactions thread pool executor + */ + ThreadPoolExecutor getLongCompactions() { + return longCompactions; + } + + /** + * @return the shortCompactions thread pool executor + */ + ThreadPoolExecutor getShortCompactions() { + return shortCompactions; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index baaa145bee1..6c1f7ff9d2f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -8218,7 +8218,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION - || op == Operation.COMPACT_REGION) { + || op == Operation.COMPACT_REGION || op == Operation.COMPACT_SWITCH) { // split, merge or compact region doesn't need to check the closing/closed state or lock the // region return; @@ -8539,7 +8539,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi stores.values().forEach(HStore::triggerMajorCompaction); } rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker, - RpcServer.getRequestUser().orElse(null)); + RpcServer.getRequestUser().orElse(null)); } @Override @@ -8554,7 +8554,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi store.triggerMajorCompaction(); } rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, - RpcServer.getRequestUser().orElse(null)); + RpcServer.getRequestUser().orElse(null)); } private void requestFlushIfNeeded() throws RegionTooBusyException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 66a1b71278c..8e8930440f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -163,6 +163,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -1658,6 +1660,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + @Override + public CompactionSwitchResponse compactionSwitch(RpcController controller, + CompactionSwitchRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled(); + CompactionSwitchResponse response = + CompactionSwitchResponse.newBuilder().setPrevState(prevState).build(); + if (prevState == request.getEnabled()) { + // passed in requested state is same as current state. No action required + return response; + } + regionServer.compactSplitThread.switchCompaction(request.getEnabled()); + return response; + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + /** * Flush a region on the region server. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 80b18b8ae2a..ecc2158e7a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -193,7 +193,7 @@ public interface Region extends ConfigurationObserver { */ enum Operation { ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE, - REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT + REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java index 76747224ba3..e5f536007e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java @@ -43,4 +43,10 @@ public interface CompactionRequester { */ void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; + + /** + * on/off compaction + */ + void switchCompaction(boolean onOrOff); + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java index 1a156ec09a8..a8ad4244620 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionAdminApi.java @@ -26,6 +26,8 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -306,6 +308,42 @@ public class TestAsyncRegionAdminApi extends TestAsyncAdminBase { assertEquals(3, countAfterMajorCompaction); } + @Test + public void testCompactionSwitchStates() throws Exception { + // Create a table with regions + byte[] family = Bytes.toBytes("family"); + byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")), + Bytes.add(family, Bytes.toBytes("3"))}; + createTableWithDefaultConf(tableName, null, families); + loadData(tableName, families, 3000, 8); + List regions = new ArrayList<>(); + TEST_UTIL + .getHBaseCluster() + .getLiveRegionServerThreads() + .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName))); + CompletableFuture> listCompletableFuture = + admin.compactionSwitch(true, new ArrayList<>()); + Map pairs = listCompletableFuture.get(); + for (Map.Entry p : pairs.entrySet()) { + assertEquals("Default compaction state, expected=enabled actual=disabled", + true, p.getValue()); + } + CompletableFuture> listCompletableFuture1 = + admin.compactionSwitch(false, new ArrayList<>()); + Map pairs1 = listCompletableFuture1.get(); + for (Map.Entry p : pairs1.entrySet()) { + assertEquals("Last compaction state, expected=enabled actual=disabled", + true, p.getValue()); + } + CompletableFuture> listCompletableFuture2 = + admin.compactionSwitch(true, new ArrayList<>()); + Map pairs2 = listCompletableFuture2.get(); + for (Map.Entry p : pairs2.entrySet()) { + assertEquals("Last compaction state, expected=disabled actual=enabled", + false, p.getValue()); + } + } + @Test public void testCompact() throws Exception { compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index d366b67fe58..f0d3b19fe34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -80,6 +80,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegion import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactionSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; @@ -481,6 +483,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { return null; } + @Override + public CompactionSwitchResponse compactionSwitch(RpcController controller, + CompactionSwitchRequest request) throws ServiceException { + return null; + } + @Override public CompactRegionResponse compactRegion(RpcController controller, CompactRegionRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 6fe76d87072..a1d76fba3f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -360,6 +360,75 @@ public class TestCompaction { postFailedCount > preFailedCount); } + /** + * Test no new Compaction requests are generated after calling stop compactions + */ + @Test public void testStopStartCompaction() throws IOException { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + // setup a region/store with some files + HStore store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) { + createStoreFile(r); + } + thread.switchCompaction(false); + thread + .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, + null); + assertEquals(false, thread.isCompactionsEnabled()); + assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() + .getActiveCount()); + thread.switchCompaction(true); + assertEquals(true, thread.isCompactionsEnabled()); + thread + .requestCompaction(r, store, "test", Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY, + null); + assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions() + .getActiveCount()); + } + + @Test public void testInterruptingRunningCompactions() throws Exception { + // setup a compact/split thread on a mock server + conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY, + WaitThroughPutController.class.getName()); + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // setup a region/store with some files + HStore store = r.getStore(COLUMN_FAMILY); + int jmax = (int) Math.ceil(15.0 / compactionThreshold); + byte[] pad = new byte[1000]; // 1 KB chunk + for (int i = 0; i < compactionThreshold; i++) { + Table loader = new RegionAsTable(r); + Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i))); + p.setDurability(Durability.SKIP_WAL); + for (int j = 0; j < jmax; j++) { + p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad); + } + HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY)); + loader.put(p); + r.flush(true); + } + HStore s = r.getStore(COLUMN_FAMILY); + int initialFiles = s.getStorefilesCount(); + + thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, null); + + Thread.sleep(3000); + thread.switchCompaction(false); + assertEquals(initialFiles, s.getStorefilesCount()); + //don't mess up future tests + thread.switchCompaction(true); + } + /** * HBASE-7947: Regression test to ensure adding to the correct list in the * {@link CompactSplit} @@ -712,4 +781,20 @@ public class TestCompaction { done.countDown(); } } + + /** + * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction + * finishes. + */ + public static class WaitThroughPutController extends NoLimitThroughputController{ + + public WaitThroughPutController() { + } + + @Override + public long control(String compactionName, long size) throws InterruptedException { + Thread.sleep(6000000); + return 6000000; + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index c385d3dabf6..75d2de3543d 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -85,6 +85,19 @@ module Hbase end end + #---------------------------------------------------------------------------------------------- + # Switch compaction on/off at runtime on a region server + def compaction_switch(on_or_off, regionserver_names) + region_servers = regionserver_names.flatten.compact + servers = java.util.ArrayList.new + if region_servers.any? + region_servers.each do |s| + servers.add(s) + end + end + @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers) + end + #---------------------------------------------------------------------------------------------- # Gets compaction state for specified table def getCompactionState(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 9a796587b6b..2c63ed69f9b 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -334,6 +334,7 @@ Shell.load_command_group( is_in_maintenance_mode close_region compact + compaction_switch flush major_compact move diff --git a/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb b/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb new file mode 100644 index 00000000000..94db700bc87 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/compaction_switch.rb @@ -0,0 +1,52 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + # Switch compaction for a region server + class CompactionSwitch < Command + def help + <<-EOF + Turn the compaction on or off on regionservers. Disabling compactions will also interrupt + any currently ongoing compactions. This state is ephemeral. The setting will be lost on + restart of the server. Compaction can also be enabled/disabled by modifying configuration + hbase.regionserver.compaction.enabled in hbase-site.xml. + Examples: + To enable compactions on all region servers + hbase> compaction_switch true + To disable compactions on all region servers + hbase> compaction_switch false + To enable compactions on specific region servers + hbase> compaction_switch true 'server2','server1' + To disable compactions on specific region servers + hbase> compaction_switch false 'server2','server1' + NOTE: A server name is its host, port plus startcode. For example: + host187.example.com,60020,1289493121758 + EOF + end + + def command(enable_disable, *server) + formatter.header(%w(['SERVER' 'PREV_STATE'])) + prev_state = admin.compaction_switch(enable_disable, server) + prev_state.each { |k, v| formatter.row([k.getServerName, java.lang.String.valueOf(v)]) } + formatter.footer(prev_state.size) + end + end + end +end diff --git a/src/main/asciidoc/_chapters/architecture.adoc b/src/main/asciidoc/_chapters/architecture.adoc index 19a700a1207..453cf62ae4a 100644 --- a/src/main/asciidoc/_chapters/architecture.adoc +++ b/src/main/asciidoc/_chapters/architecture.adoc @@ -1845,6 +1845,14 @@ See <>. Compactions do not perform region merges. See <> for more information on region merging. +.Compaction Switch +We can switch on and off the compactions at region servers. Switching off compactions will also +interrupt any currently ongoing compactions. It can be done dynamically using the "compaction_switch" +command from hbase shell. If done from the command line, this setting will be lost on restart of the +server. To persist the changes across region servers modify the configuration hbase.regionserver +.compaction.enabled in hbase-site.xml and restart HBase. + + [[compaction.file.selection]] ===== Compaction Policy - HBase 0.96.x and newer