diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 94550df8092..ba96a5b6ed3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -158,7 +158,7 @@ public interface RegionObserver { /** * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of * available candidates. To alter the files used for compaction, you may mutate the passed in list - * of candidates. + * of candidates. If you remove all the candidates then the compaction will be canceled. * @param c the environment provided by the region server * @param store the store where compaction is being requested * @param candidates the store files currently available for compaction @@ -183,18 +183,12 @@ public interface RegionObserver { /** * Called prior to writing the {@link StoreFile}s selected for compaction into a new - * {@code StoreFile}. To override or modify the compaction process, implementing classes have two - * options: - * + * {@code StoreFile}. + *

+ * To override or modify the compaction process, implementing classes can wrap the provided + * {@link InternalScanner} with a custom implementation that is returned from this method. The + * custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s from the wrapped scanner, + * applying its own policy to what gets written. * @param c the environment provided by the region server * @param store the store being compacted * @param scanner the scanner over existing data used in the store file rewriting @@ -206,8 +200,7 @@ public interface RegionObserver { */ default InternalScanner preCompact(ObserverContext c, Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, - CompactionRequest request) - throws IOException { + CompactionRequest request) throws IOException { return scanner; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index f37e49e50d1..b82b3465cce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -34,6 +34,8 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; @@ -60,7 +63,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; * Compact region on request and then run split if appropriate */ @InterfaceAudience.Private -public class CompactSplit implements PropagatingConfigurationObserver { +public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { private static final Log LOG = LogFactory.getLog(CompactSplit.class); // Configuration key for the large compaction threads. @@ -99,7 +102,6 @@ public class CompactSplit implements PropagatingConfigurationObserver { /** @param server */ CompactSplit(HRegionServer server) { - super(); this.server = server; this.conf = server.getConfiguration(); this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, @@ -235,14 +237,68 @@ public class CompactSplit implements PropagatingConfigurationObserver { } } - public synchronized void requestCompaction(HRegion region, String why, int priority, - CompactionLifeCycleTracker tracker, User user) throws IOException { - requestCompactionInternal(region, why, priority, true, tracker, user); + // A compaction life cycle tracker to trace the execution of all the compactions triggered by one + // request and delegate to the source CompactionLifeCycleTracker. It will call completed method if + // all the compactions are finished. + private static final class AggregatingCompactionLifeCycleTracker + implements CompactionLifeCycleTracker { + + private final CompactionLifeCycleTracker tracker; + + private final AtomicInteger remaining; + + public AggregatingCompactionLifeCycleTracker(CompactionLifeCycleTracker tracker, + int numberOfStores) { + this.tracker = tracker; + this.remaining = new AtomicInteger(numberOfStores); + } + + private void tryCompleted() { + if (remaining.decrementAndGet() == 0) { + tracker.completed(); + } + } + + @Override + public void notExecuted(Store store, String reason) { + tracker.notExecuted(store, reason); + tryCompleted(); + } + + @Override + public void beforeExecution(Store store) { + tracker.beforeExecution(store); + } + + @Override + public void afterExecution(Store store) { + tracker.afterExecution(store); + tryCompleted(); + } } + private CompactionLifeCycleTracker wrap(CompactionLifeCycleTracker tracker, + IntSupplier numberOfStores) { + if (tracker == CompactionLifeCycleTracker.DUMMY) { + // a simple optimization to avoid creating unnecessary objects as usually we do not care about + // the life cycle of a compaction. + return tracker; + } else { + return new AggregatingCompactionLifeCycleTracker(tracker, numberOfStores.getAsInt()); + } + } + + @Override + public synchronized void requestCompaction(HRegion region, String why, int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { + requestCompactionInternal(region, why, priority, true, + wrap(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user); + } + + @Override public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, User user) throws IOException { - requestCompactionInternal(region, store, why, priority, true, tracker, user); + requestCompactionInternal(region, store, why, priority, true, wrap(tracker, () -> 1), user); } private void requestCompactionInternal(HRegion region, String why, int priority, @@ -259,6 +315,17 @@ public class CompactSplit implements PropagatingConfigurationObserver { !region.getTableDescriptor().isCompactionEnabled())) { return; } + RegionServerSpaceQuotaManager spaceQuotaManager = + this.server.getRegionServerSpaceQuotaManager(); + if (spaceQuotaManager != null && + spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { + String reason = "Ignoring compaction request for " + region + + " as an active space quota violation " + " policy disallows compactions."; + tracker.notExecuted(store, reason); + LOG.debug(reason); + return; + } + Optional compaction; if (selectNow) { compaction = selectCompaction(region, store, priority, tracker, user); @@ -270,17 +337,6 @@ public class CompactSplit implements PropagatingConfigurationObserver { compaction = Optional.empty(); } - RegionServerSpaceQuotaManager spaceQuotaManager = - this.server.getRegionServerSpaceQuotaManager(); - if (spaceQuotaManager != null && - spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring compaction request for " + region + - " as an active space quota violation " + " policy disallows compactions."); - } - return; - } - ThreadPoolExecutor pool; if (selectNow) { // compaction.get is safe as we will just return if selectNow is true but no compaction is @@ -315,9 +371,11 @@ public class CompactSplit implements PropagatingConfigurationObserver { private Optional selectCompaction(HRegion region, HStore store, int priority, CompactionLifeCycleTracker tracker, User user) throws IOException { Optional compaction = store.requestCompaction(priority, tracker, user); - if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo() != null) { - LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() + - " because compaction request was cancelled"); + if (!compaction.isPresent() && region.getRegionInfo() != null) { + String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() + + " because compaction request was cancelled"; + tracker.notExecuted(store, reason); + LOG.debug(reason); } return compaction; } @@ -454,7 +512,6 @@ public class CompactSplit implements PropagatingConfigurationObserver { public CompactionRunner(HStore store, HRegion region, Optional compaction, ThreadPoolExecutor parent, User user) { - super(); this.store = store; this.region = region; this.compaction = compaction; @@ -462,7 +519,7 @@ public class CompactSplit implements PropagatingConfigurationObserver { : store.getCompactPriority(); this.parent = parent; this.user = user; - this.time = System.currentTimeMillis(); + this.time = EnvironmentEdgeManager.currentTime(); } @Override @@ -520,7 +577,7 @@ public class CompactSplit implements PropagatingConfigurationObserver { // Finally we can compact something. assert c != null; - c.getRequest().getTracker().beforeExecute(store); + c.getRequest().getTracker().beforeExecution(store); try { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. @@ -553,7 +610,7 @@ public class CompactSplit implements PropagatingConfigurationObserver { region.reportCompactionRequestFailure(); server.checkFileSystem(); } finally { - c.getRequest().getTracker().afterExecute(store); + c.getRequest().getTracker().afterExecution(store); region.decrementCompactionsQueuedCount(); LOG.debug("CompactSplitThread Status: " + CompactSplit.this); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 84b0d6a8852..9022e1f7edf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1952,11 +1952,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected void doRegionCompactionPrep() throws IOException { } - @Override - public void triggerMajorCompaction() throws IOException { - stores.values().forEach(HStore::triggerMajorCompaction); - } - /** * Synchronously compact all stores in the region. *

This operation could block for a long time, so don't call it from a @@ -1972,7 +1967,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public void compact(boolean majorCompaction) throws IOException { if (majorCompaction) { - triggerMajorCompaction(); + stores.values().forEach(HStore::triggerMajorCompaction); } for (HStore s : stores.values()) { Optional compaction = s.requestCompaction(); @@ -8212,16 +8207,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, - User user) throws IOException { - ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker, - user); + public void requestCompaction(String why, int priority, boolean major, + CompactionLifeCycleTracker tracker) throws IOException { + if (major) { + stores.values().forEach(HStore::triggerMajorCompaction); + } + rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker, + RpcServer.getRequestUser().orElse(null)); } @Override - public void requestCompaction(byte[] family, String why, int priority, - CompactionLifeCycleTracker tracker, User user) throws IOException { - ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, - Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user); + public void requestCompaction(byte[] family, String why, int priority, boolean major, + CompactionLifeCycleTracker tracker) throws IOException { + HStore store = stores.get(family); + if (store == null) { + throw new NoSuchColumnFamilyException("column family " + Bytes.toString(family) + + " does not exist in region " + getRegionInfo().getRegionNameAsString()); + } + if (major) { + store.triggerMajorCompaction(); + } + rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, + RpcServer.getRequestUser().orElse(null)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d396b3e981c..2c0bd035f59 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; @@ -1686,9 +1687,9 @@ public class HRegionServer extends HasThread implements int totalStaticBloomSizeKB = 0; long totalCompactingKVs = 0; long currentCompactedKVs = 0; - List storeList = r.getStores(); + List storeList = r.getStores(); stores += storeList.size(); - for (Store store : storeList) { + for (HStore store : storeList) { storefiles += store.getStorefilesCount(); storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed() / 1024 / 1024); storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); @@ -2779,6 +2780,11 @@ public class HRegionServer extends HasThread implements return this.cacheFlusher; } + @Override + public CompactionRequester getCompactionRequestor() { + return this.compactSplitThread; + } + /** * Get the top N most loaded regions this server is serving so we can tell the * master which regions it can reallocate if we're overloaded. TODO: actually diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 83b5561d57e..2ec5437ebf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1625,7 +1625,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles()); } - @Override + /** + * getter for CompactionProgress object + * @return CompactionProgress object; can be null + */ public CompactionProgress getCompactionProgress() { return this.storeEngine.getCompactor().getProgress(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index a565eeb557d..045838a6507 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1533,7 +1533,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws ServiceException */ @Override - @QosPriority(priority=HConstants.ADMIN_QOS) + @QosPriority(priority = HConstants.ADMIN_QOS) public CompactRegionResponse compactRegion(final RpcController controller, final CompactRegionRequest request) throws ServiceException { try { @@ -1551,41 +1551,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } region.startRegionOperation(Operation.COMPACT_REGION); LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); - boolean major = false; - byte [] family = null; - HStore store = null; + boolean major = request.hasMajor() && request.getMajor(); if (request.hasFamily()) { - family = request.getFamily().toByteArray(); - store = region.getStore(family); - if (store == null) { - throw new ServiceException(new DoNotRetryIOException("column family " + - Bytes.toString(family) + " does not exist in region " + - region.getRegionInfo().getRegionNameAsString())); - } - } - if (request.hasMajor()) { - major = request.getMajor(); - } - if (major) { - if (family != null) { - store.triggerMajorCompaction(); - } else { - region.triggerMajorCompaction(); - } - } - - String familyLogMsg = (family != null)?" for column family: " + Bytes.toString(family):""; - if (LOG.isTraceEnabled()) { - LOG.trace("User-triggered compaction requested for region " - + region.getRegionInfo().getRegionNameAsString() + familyLogMsg); - } - String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; - if (family != null) { - regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, - CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser().orElse(null)); + byte[] family = request.getFamily().toByteArray(); + String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + + region.getRegionInfo().getRegionNameAsString() + " and family " + + Bytes.toString(family); + LOG.trace(log); + region.requestCompaction(family, log, Store.PRIORITY_USER, major, + CompactionLifeCycleTracker.DUMMY); } else { - regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, - CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser().orElse(null)); + String log = "User-triggered " + (major ? "major " : "") + "compaction for region " + + region.getRegionInfo().getRegionNameAsString(); + LOG.trace(log); + region.requestCompaction(log, Store.PRIORITY_USER, major, CompactionLifeCycleTracker.DUMMY); } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 79012ea92df..0c93ed13cd4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -43,7 +42,6 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -454,15 +452,6 @@ public interface Region extends ConfigurationObserver { // Flushes, compactions, splits, etc. // Wizards only, please - /** - * Trigger major compaction on all stores in the region. - *

- * Compaction will be performed asynchronously to this call by the RegionServer's - * CompactSplitThread. - * @throws IOException - */ - void triggerMajorCompaction() throws IOException; - /** * @return if a given region is in compaction now. */ @@ -471,12 +460,12 @@ public interface Region extends ConfigurationObserver { /** * Request compaction on this region. */ - void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, User user) - throws IOException; + void requestCompaction(String why, int priority, boolean major, + CompactionLifeCycleTracker tracker) throws IOException; /** * Request compaction for the given family */ - void requestCompaction(byte[] family, String why, int priority, - CompactionLifeCycleTracker tracker, User user) throws IOException; + void requestCompaction(byte[] family, String why, int priority, boolean major, + CompactionLifeCycleTracker tracker) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index d8e8ac57c8f..af883a38a8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; @@ -59,10 +60,17 @@ public interface RegionServerServices extends Server, OnlineRegions, FavoredNode List getWALs() throws IOException; /** - * @return Implementation of {@link FlushRequester} or null. + * @return Implementation of {@link FlushRequester} or null. Usually it will not be null unless + * during intialization. */ FlushRequester getFlushRequester(); + /** + * @return Implementation of {@link CompactionRequester} or null. Usually it will not be null + * unless during intialization. + */ + CompactionRequester getCompactionRequestor(); + /** * @return the RegionServerAccounting for this Region Server */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b6bad6f8139..d60de6bc8c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -61,12 +60,6 @@ public interface Store { FileSystem getFileSystem(); - /** - * getter for CompactionProgress object - * @return CompactionProgress object; can be null - */ - CompactionProgress getCompactionProgress(); - /** * Tests whether we should run a major compaction. For example, if the configured major compaction * interval is reached. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java index 38fec7e6d2b..dfff2f980fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionLifeCycleTracker.java @@ -32,13 +32,19 @@ public interface CompactionLifeCycleTracker { static CompactionLifeCycleTracker DUMMY = new CompactionLifeCycleTracker() { }; + /** + * Called if the compaction request is failed for some reason. + */ + default void notExecuted(Store store, String reason) { + } + /** * Called before compaction is executed by CompactSplitThread. *

* Requesting compaction on a region can lead to multiple compactions on different stores, so we * will pass the {@link Store} in to tell you the store we operate on. */ - default void beforeExecute(Store store) { + default void beforeExecution(Store store) { } /** @@ -47,6 +53,15 @@ public interface CompactionLifeCycleTracker { * Requesting compaction on a region can lead to multiple compactions on different stores, so we * will pass the {@link Store} in to tell you the store we operate on. */ - default void afterExecute(Store store) { + default void afterExecution(Store store) { + } + + /** + * Called after all the requested compactions are completed. + *

+ * The compaction scheduling is per Store so if you request a compaction on a region it may lead + * to multiple compactions. + */ + default void completed() { } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java new file mode 100644 index 00000000000..76747224ba3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; + +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Request a compaction. + */ +@InterfaceAudience.Private +public interface CompactionRequester { + + /** + * Request compaction on all the stores of the given region. + */ + void requestCompaction(HRegion region, String why, int priority, + CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; + + /** + * Request compaction on the given store. + */ + void requestCompaction(HRegion region, HStore store, String why, int priority, + CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index c7397159fbb..58a0055ecf3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; @@ -150,6 +151,11 @@ public class MockRegionServerServices implements RegionServerServices { return null; } + @Override + public CompactionRequester getCompactionRequestor() { + return null; + } + @Override public ClusterConnection getConnection() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 6ca7076dc6c..119c22547f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; @@ -219,7 +220,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public boolean isStopped() { - // TODO Auto-generated method stub return false; } @@ -264,18 +264,15 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public void addRegion(HRegion r) { - // TODO Auto-generated method stub } @Override public boolean removeRegion(HRegion r, ServerName destination) { - // TODO Auto-generated method stub return false; } @Override public HRegion getRegion(String encodedRegionName) { - // TODO Auto-generated method stub return null; } @@ -316,13 +313,14 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public FlushRequester getFlushRequester() { - // TODO Auto-generated method stub return null; } - + @Override + public CompactionRequester getCompactionRequestor() { + return null; + } @Override public RegionServerAccounting getRegionServerAccounting() { - // TODO Auto-generated method stub return null; } @@ -334,24 +332,20 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public void postOpenDeployTasks(PostOpenDeployContext context) throws KeeperException, IOException { - // TODO Auto-generated method stub } @Override public RpcServerInterface getRpcServer() { - // TODO Auto-generated method stub return null; } @Override public ConcurrentSkipListMap getRegionsInTransitionInRS() { - // TODO Auto-generated method stub return null; } @Override public FileSystem getFileSystem() { - // TODO Auto-generated method stub return null; } @@ -371,7 +365,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public MutateResponse mutate(RpcController controller, MutateRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @@ -410,7 +403,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public BulkLoadHFileResponse bulkLoadHFile(RpcController controller, BulkLoadHFileRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @@ -423,7 +415,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public ClientProtos.MultiResponse multi( RpcController controller, MultiRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @@ -451,14 +442,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public GetStoreFileResponse getStoreFile(RpcController controller, GetStoreFileRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public GetOnlineRegionResponse getOnlineRegion(RpcController controller, GetOnlineRegionRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @@ -470,74 +459,63 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public WarmupRegionResponse warmupRegion(RpcController controller, WarmupRegionRequest request) throws ServiceException { - //TODO Auto-generated method stub return null; } @Override public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public CompactRegionResponse compactRegion(RpcController controller, CompactRegionRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public RollWALWriterResponse rollWALWriter(RpcController controller, RollWALWriterRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public GetServerInfoResponse getServerInfo(RpcController controller, GetServerInfoRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public StopServerResponse stopServer(RpcController controller, StopServerRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public List getRegions(TableName tableName) throws IOException { - // TODO Auto-generated method stub return null; } @Override public Leases getLeases() { - // TODO Auto-generated method stub return null; } @@ -575,13 +553,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public ReplicateWALEntryResponse replay(RpcController controller, ReplicateWALEntryRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } @Override public Map getRecoveringRegions() { - // TODO Auto-generated method stub return null; } @@ -603,14 +579,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public boolean registerService(com.google.protobuf.Service service) { - // TODO Auto-generated method stub return false; } @Override public CoprocessorServiceResponse execRegionServerService(RpcController controller, CoprocessorServiceRequest request) throws ServiceException { - // TODO Auto-generated method stub return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a9f331e6f41..63168099256 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -706,7 +706,7 @@ public class TestCompaction { } @Override - public void afterExecute(Store store) { + public void afterExecution(Store store) { done.countDown(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java new file mode 100644 index 00000000000..70d3463d2f3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionLifeCycleTracker.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot; +import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus; +import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Confirm that the function of CompactionLifeCycleTracker is OK as we do not use it in our own + * code. + */ +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestCompactionLifeCycleTracker { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName NAME = + TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); + + private static final byte[] CF1 = Bytes.toBytes("CF1"); + + private static final byte[] CF2 = Bytes.toBytes("CF2"); + + private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); + + private HRegion region; + + private static CompactionLifeCycleTracker TRACKER = null; + + // make sure that we pass the correct CompactionLifeCycleTracker to CP hooks. + public static final class CompactionObserver implements RegionObserver { + + @Override + public void preCompactSelection(ObserverContext c, Store store, + List candidates, CompactionLifeCycleTracker tracker) + throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + } + + @Override + public void postCompactSelection(ObserverContext c, Store store, + List selected, CompactionLifeCycleTracker tracker, + CompactionRequest request) { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + } + + @Override + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + return scanner; + } + + @Override + public void postCompact(ObserverContext c, Store store, + StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) + throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1)) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF2)) + .addCoprocessor(CompactionObserver.class.getName()).build()); + try (Table table = UTIL.getConnection().getTable(NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(NAME); + for (int i = 100; i < 200; i++) { + table.put(new Put(Bytes.toBytes(i)).addImmutable(CF1, QUALIFIER, Bytes.toBytes(i))); + } + UTIL.getAdmin().flush(NAME); + } + region = UTIL.getHBaseCluster().getRegions(NAME).get(0); + assertEquals(2, region.getStore(CF1).getStorefilesCount()); + assertEquals(0, region.getStore(CF2).getStorefilesCount()); + } + + @After + public void tearDown() throws IOException { + region = null; + TRACKER = null; + UTIL.deleteTable(NAME); + } + + private static final class Tracker implements CompactionLifeCycleTracker { + + final List> notExecutedStores = new ArrayList<>(); + + final List beforeExecuteStores = new ArrayList<>(); + + final List afterExecuteStores = new ArrayList<>(); + + private boolean completed = false; + + @Override + public void notExecuted(Store store, String reason) { + notExecutedStores.add(Pair.newPair(store, reason)); + } + + @Override + public void beforeExecution(Store store) { + beforeExecuteStores.add(store); + } + + @Override + public void afterExecution(Store store) { + afterExecuteStores.add(store); + } + + @Override + public synchronized void completed() { + completed = true; + notifyAll(); + } + + public synchronized void await() throws InterruptedException { + while (!completed) { + wait(); + } + } + } + + @Test + public void testRequestOnRegion() throws IOException, InterruptedException { + Tracker tracker = new Tracker(); + TRACKER = tracker; + region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); + tracker.await(); + assertEquals(1, tracker.notExecutedStores.size()); + assertEquals(Bytes.toString(CF2), + tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); + assertThat(tracker.notExecutedStores.get(0).getSecond(), + containsString("compaction request was cancelled")); + + assertEquals(1, tracker.beforeExecuteStores.size()); + assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); + + assertEquals(1, tracker.afterExecuteStores.size()); + assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); + } + + @Test + public void testRequestOnStore() throws IOException, InterruptedException { + Tracker tracker = new Tracker(); + TRACKER = tracker; + region.requestCompaction(CF1, "test", Store.PRIORITY_USER, false, tracker); + tracker.await(); + assertTrue(tracker.notExecutedStores.isEmpty()); + assertEquals(1, tracker.beforeExecuteStores.size()); + assertEquals(Bytes.toString(CF1), tracker.beforeExecuteStores.get(0).getColumnFamilyName()); + assertEquals(1, tracker.afterExecuteStores.size()); + assertEquals(Bytes.toString(CF1), tracker.afterExecuteStores.get(0).getColumnFamilyName()); + + tracker = new Tracker(); + TRACKER = tracker; + region.requestCompaction(CF2, "test", Store.PRIORITY_USER, false, tracker); + tracker.await(); + assertEquals(1, tracker.notExecutedStores.size()); + assertEquals(Bytes.toString(CF2), + tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); + assertThat(tracker.notExecutedStores.get(0).getSecond(), + containsString("compaction request was cancelled")); + assertTrue(tracker.beforeExecuteStores.isEmpty()); + assertTrue(tracker.afterExecuteStores.isEmpty()); + } + + @Test + public void testSpaceQuotaViolation() throws IOException, InterruptedException { + region.getRegionServerServices().getRegionServerSpaceQuotaManager().enforceViolationPolicy(NAME, + new SpaceQuotaSnapshot(new SpaceQuotaStatus(SpaceViolationPolicy.NO_WRITES_COMPACTIONS), 10L, + 100L)); + Tracker tracker = new Tracker(); + TRACKER = tracker; + region.requestCompaction("test", Store.PRIORITY_USER, false, tracker); + tracker.await(); + assertEquals(2, tracker.notExecutedStores.size()); + tracker.notExecutedStores.sort((p1, p2) -> p1.getFirst().getColumnFamilyName() + .compareTo(p2.getFirst().getColumnFamilyName())); + + assertEquals(Bytes.toString(CF1), + tracker.notExecutedStores.get(0).getFirst().getColumnFamilyName()); + assertThat(tracker.notExecutedStores.get(0).getSecond(), + containsString("space quota violation")); + + assertEquals(Bytes.toString(CF2), + tracker.notExecutedStores.get(1).getFirst().getColumnFamilyName()); + assertThat(tracker.notExecutedStores.get(1).getSecond(), + containsString("space quota violation")); + + assertTrue(tracker.beforeExecuteStores.isEmpty()); + assertTrue(tracker.afterExecuteStores.isEmpty()); + } +}