diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index ac76edbdeae..a7759c53c78 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -265,7 +265,14 @@ public enum EventType {
*
* RS_REGION_REPLICA_FLUSH
*/
- RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
+ RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
+
+ /**
+ * RS compacted files discharger
+ *
+ * RS_COMPACTED_FILES_DISCHARGER
+ */
+ RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
private final int code;
private final ExecutorType executor;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index d0f6beedbb6..5a161497b91 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -46,7 +46,8 @@ public enum ExecutorType {
RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27),
- RS_REGION_REPLICA_FLUSH_OPS (28);
+ RS_REGION_REPLICA_FLUSH_OPS (28),
+ RS_COMPACTED_FILES_DISCHARGER (29);
ExecutorType(int value) {}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 410fb39f087..335b672aac6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -78,7 +79,8 @@ public class ExecutorService {
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
*/
- void startExecutorService(String name, int maxThreads) {
+ @VisibleForTesting
+ public void startExecutorService(String name, int maxThreads) {
if (this.executorMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running!");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
new file mode 100644
index 00000000000..02160d8fa6c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+
+/**
+ * Event handler that handles the removal and archival of the compacted hfiles
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischargeHandler extends EventHandler {
+
+ private HStore store;
+
+ public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) {
+ super(server, eventType);
+ this.store = store;
+ }
+
+ @Override
+ public void process() throws IOException {
+ this.store.closeAndArchiveCompactedFiles();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
new file mode 100644
index 00000000000..c4974cf9237
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A chore service that periodically cleans up the compacted files when there are no active readers
+ * using those compacted files and also helps in clearing the block cache with these compacted
+ * file entries
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischarger extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
+ private RegionServerServices regionServerServices;
+ // Default is to use executor
+ @VisibleForTesting
+ private boolean useExecutor = true;
+
+ /**
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param regionServerServices the region server that starts this chore
+ */
+ public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+ final RegionServerServices regionServerServices) {
+ // Need to add the config classes
+ super("CompactedHFilesCleaner", stopper, period);
+ this.regionServerServices = regionServerServices;
+ }
+
+ /**
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param regionServerServices the region server that starts this chore
+ * @param useExecutor true if to use the region server's executor service, false otherwise
+ */
+ @VisibleForTesting
+ public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+ final RegionServerServices regionServerServices, boolean useExecutor) {
+ // Need to add the config classes
+ this(period, stopper, regionServerServices);
+ this.useExecutor = useExecutor;
+ }
+
+ @Override
+ public void chore() {
+ List onlineRegions = regionServerServices.getOnlineRegions();
+ if (onlineRegions != null) {
+ for (Region region : onlineRegions) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
+ }
+ for (Store store : region.getStores()) {
+ try {
+ if (useExecutor && regionServerServices != null) {
+ CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
+ (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
+ (HStore) store);
+ regionServerServices.getExecutorService().submit(handler);
+ } else {
+ // call synchronously if the RegionServerServices are not
+ // available
+ store.closeAndArchiveCompactedFiles();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed archiving the compacted files for the region "
+ + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while trying to close and archive the comapcted store "
+ + "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ + region.getRegionInfo(), e);
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
+ }
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 38404c7f25c..d059cd07576 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
@@ -151,7 +150,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@@ -814,20 +812,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeStores(reporter, status);
- // Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
- // that will no longer be used in reads.
- if (this.getRegionServerServices() != null) {
- ChoreService choreService = this.getRegionServerServices().getChoreService();
- if (choreService != null) {
- // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
- // 2 mins so that compacted files can be archived before the TTLCleaner runs
- int cleanerInterval =
- conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
- this.compactedFileDischarger =
- new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
- choreService.scheduleChore(compactedFileDischarger);
- }
- }
this.mvcc.advanceTo(maxSeqId);
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
// Recover any edits if available.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 00046bafdb0..b2cc78abc32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -134,6 +134,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -484,6 +485,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
*/
protected final ConfigurationManager configurationManager;
+ private CompactedHFilesDischarger compactedFileDischarger;
+
/**
* Starts a HRegionServer at the default location.
* @param conf
@@ -615,6 +618,16 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
}
});
}
+ // Create the CompactedFileDischarger chore service. This chore helps to
+ // remove the compacted files
+ // that will no longer be used in reads.
+ // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
+ // 2 mins so that compacted files can be archived before the TTLCleaner runs
+ int cleanerInterval =
+ conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
+ this.compactedFileDischarger =
+ new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
+ choreService.scheduleChore(compactedFileDischarger);
}
protected TableDescriptors getFsTableDescriptors() throws IOException {
@@ -1716,7 +1729,9 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
-
+ // Start the threads for compacted files discharger
+ this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
+ conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
@@ -2725,6 +2740,15 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
return tableRegions;
}
+ @Override
+ public List getOnlineRegions() {
+ List allRegions = new ArrayList();
+ synchronized (this.onlineRegions) {
+ // Return a clone copy of the onlineRegions
+ allRegions.addAll(onlineRegions.values());
+ }
+ return allRegions;
+ }
/**
* Gets the online tables in this RS.
* This method looks at the in-memory onlineRegions.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8d66696294a..9ebdaeef752 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -91,7 +88,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -139,8 +135,6 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
- private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
- private CompletionService completionService = null;
/**
* RWLock for store operations.
@@ -274,10 +268,6 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber);
}
- compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
- conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
- completionService =
- new ExecutorCompletionService(compactionCleanerthreadPoolExecutor);
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
}
@@ -802,7 +792,9 @@ public class HStore implements Store {
Collection compactedfiles =
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
- removeCompactedFiles(compactedfiles);
+ if (compactedfiles != null && !compactedfiles.isEmpty()) {
+ removeCompactedfiles(compactedfiles);
+ }
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -844,9 +836,6 @@ public class HStore implements Store {
}
if (ioe != null) throw ioe;
}
- if (compactionCleanerthreadPoolExecutor != null) {
- compactionCleanerthreadPoolExecutor.shutdownNow();
- }
LOG.info("Closed " + this);
return result;
} finally {
@@ -2174,7 +2163,7 @@ public class HStore implements Store {
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@@ -2311,92 +2300,72 @@ public class HStore implements Store {
} finally {
lock.readLock().unlock();
}
- removeCompactedFiles(copyCompactedfiles);
- }
-
- private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
- return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "CompactedfilesArchiver-" + count++);
- }
- });
- }
-
- private void removeCompactedFiles(Collection compactedfiles) throws IOException {
- if (compactedfiles != null && !compactedfiles.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing the compacted store files " + compactedfiles);
- }
- for (final StoreFile file : compactedfiles) {
- completionService.submit(new Callable() {
- @Override
- public StoreFile call() throws IOException {
- synchronized (file) {
- try {
- StoreFile.Reader r = file.getReader();
- if (r == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The file " + file + " was closed but still not archived.");
- }
- return file;
- }
- if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
- // Even if deleting fails we need not bother as any new scanners won't be
- // able to use the compacted file as the status is already compactedAway
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closing and archiving the file " + file.getPath());
- }
- r.close(true);
- // Just close and return
- return file;
- }
- } catch (Exception e) {
- LOG.error("Exception while trying to close the compacted store file "
- + file.getPath().getName());
- }
- }
- return null;
- }
- });
- }
- final List filesToRemove = new ArrayList(compactedfiles.size());
- try {
- for (final StoreFile file : compactedfiles) {
- Future future = completionService.take();
- StoreFile closedFile = future.get();
- if (closedFile != null) {
- filesToRemove.add(closedFile);
- }
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted exception while closing the compacted files", ie);
- } catch (Exception e) {
- LOG.error("Exception occured while closing the compacted files", e);
- }
- if (isPrimaryReplicaStore()) {
- archiveAndRemoveCompactedFiles(filesToRemove);
- }
-
+ if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
+ removeCompactedfiles(copyCompactedfiles);
}
}
- private void archiveAndRemoveCompactedFiles(List filesToArchive) throws IOException {
- if (!filesToArchive.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Moving the files " + filesToArchive + " to archive");
+ /**
+ * Archives and removes the compacted files
+ * @param compactedfiles The compacted files in this store that are not active in reads
+ * @throws IOException
+ */
+ private void removeCompactedfiles(Collection compactedfiles)
+ throws IOException {
+ final List filesToRemove = new ArrayList(compactedfiles.size());
+ for (final StoreFile file : compactedfiles) {
+ synchronized (file) {
+ try {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The file " + file + " was closed but still not archived.");
+ }
+ filesToRemove.add(file);
+ }
+ if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+ // Even if deleting fails we need not bother as any new scanners won't be
+ // able to use the compacted file as the status is already compactedAway
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing and archiving the file " + file.getPath());
+ }
+ r.close(true);
+ // Just close and return
+ filesToRemove.add(file);
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Exception while trying to close the compacted store file " + file.getPath().getName());
+ }
}
- // Only if this is successful it has to be removed
- this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
- try {
- lock.writeLock().lock();
- this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
- } finally {
- lock.writeLock().unlock();
+ }
+ if (this.isPrimaryReplicaStore()) {
+ // Only the primary region is allowed to move the file to archive.
+ // The secondary region does not move the files to archive. Any active reads from
+ // the secondary region will still work because the file as such has active readers on it.
+ if (!filesToRemove.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving the files " + filesToRemove + " to archive");
+ }
+ // Only if this is successful it has to be removed
+ this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
}
}
+ if (!filesToRemove.isEmpty()) {
+ // Clear the compactedfiles from the store file manager
+ clearCompactedfiles(filesToRemove);
+ }
+ }
+
+ private void clearCompactedfiles(final List filesToRemove) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
+ }
+ try {
+ lock.writeLock().lock();
+ this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
index 60fc9fb8ade..310108c0dc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
@@ -67,4 +67,10 @@ public interface OnlineRegions extends Server {
* @throws java.io.IOException
*/
List getOnlineRegions(TableName tableName) throws IOException;
+
+ /**
+ * Get all online regions in this RS.
+ * @return List of online Region
+ */
+ List getOnlineRegions();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
deleted file mode 100644
index 4cf120dca89..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischarger.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.Store;
-
-/**
- * A chore service that periodically cleans up the compacted files when there are no active readers
- * using those compacted files and also helps in clearing the block cache with these compacted
- * file entries
- */
-@InterfaceAudience.Private
-public class CompactedHFilesDischarger extends ScheduledChore {
- private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
- private Region region;
-
- /**
- * @param period the period of time to sleep between each run
- * @param stopper the stopper
- * @param region the store to identify the family name
- */
- public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
- // Need to add the config classes
- super("CompactedHFilesCleaner", stopper, period);
- this.region = region;
- }
-
- @Override
- public void chore() {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
- }
- for (Store store : region.getStores()) {
- try {
- store.closeAndArchiveCompactedFiles();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Completed archiving the compacted files for the region "
- + this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
- }
- } catch (Exception e) {
- LOG.error(
- "Exception while trying to close and archive the comapcted store files of the store "
- + store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
- e);
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
- }
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 62e7c7c4857..633477e005e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -64,6 +64,9 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
"hbase.hstore.min.locality.to.skip.major.compact";
+ public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT =
+ "hbase.hfile.compaction.discharger.thread.count";
+
Configuration conf;
StoreConfigInformation storeConfigInfo;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index a7fc75beffc..0986ad7812f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -109,6 +109,11 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
+ @Override
+ public List getOnlineRegions() {
+ return null;
+ }
+
@Override
public void addToOnlineRegions(Region r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index 55e43de458d..64139ee5312 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,10 +44,11 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -79,6 +82,7 @@ public class TestZooKeeperTableArchiveClient {
private static ZKTableArchiveClient archivingClient;
private final List toCleanup = new ArrayList();
private static ClusterConnection CONNECTION;
+ private static RegionServerServices rss;
/**
* Setup the config for the cluster
@@ -93,6 +97,7 @@ public class TestZooKeeperTableArchiveClient {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
+ rss = mock(RegionServerServices.class);
}
private static void setupConf(Configuration conf) {
@@ -173,8 +178,11 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
- new CompactedHFilesDischarger(100, stop, region);
+ new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// get the current hfiles in the archive directory
@@ -223,15 +231,21 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
- new CompactedHFilesDischarger(100, stop, region);
+ new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// create the another table that we don't archive
hcd = new HColumnDescriptor(TEST_FAM);
HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
- final CompactedHFilesDischarger compactionCleaner1 =
- new CompactedHFilesDischarger(100, stop, otherRegion);
+ regions = new ArrayList();
+ regions.add(otherRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop,
+ rss, false);
loadFlushAndCompact(otherRegion, TEST_FAM);
compactionCleaner1.chore();
// get the current hfiles in the archive directory
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 234ad20aeda..32f644bb884 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -461,6 +461,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
+ @Override
+ public List getOnlineRegions() {
+ return null;
+ }
+
@Override
public OpenRegionResponse openRegion(RpcController controller,
OpenRegionRequest request) throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 60c5473e388..a6b6e4c2133 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -46,11 +46,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -324,10 +324,17 @@ public class TestSnapshotFromMaster {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 2 so will compact and archive
}
- for (HRegion region : regions) {
- CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
- cleaner.chore();
+ List regionServerThreads = UTIL.getMiniHBaseCluster()
+ .getRegionServerThreads();
+ HRegionServer hrs = null;
+ for (RegionServerThread rs : regionServerThreads) {
+ if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) {
+ hrs = rs.getRegionServer();
+ break;
+ }
}
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
+ cleaner.chore();
LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index c59d6f72f66..382193bf6a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -69,7 +70,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -167,9 +167,17 @@ public class TestHRegionReplayEvents {
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
when(rss.getConfiguration()).thenReturn(CONF);
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
-
+ String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
+ .toString();
+ ExecutorService es = new ExecutorService(string);
+ es.startExecutorService(
+ string+"-"+string, 1);
+ when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close();
+ List regions = new ArrayList();
+ regions.add(primaryRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@@ -1370,7 +1378,10 @@ public class TestHRegionReplayEvents {
// Test case 3: compact primary files
primaryRegion.compactStores();
- CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
+ List regions = new ArrayList();
+ regions.add(primaryRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
cleaner.chore();
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index e0c1453ec8a..44b24cefcb7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -62,12 +62,12 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.util.StringUtils;
@@ -246,25 +246,37 @@ public class TestRegionMergeTransactionOnCluster {
count += hrfs.getStoreFiles(colFamily.getName()).size();
}
admin.compactRegion(mergedRegionInfo.getRegionName());
- // wait until merged region doesn't have reference file
+ // clean up the merged region store files
+ // wait until merged region have reference file
long timeout = System.currentTimeMillis() + waitTime;
+ int newcount = 0;
while (System.currentTimeMillis() < timeout) {
- if (!hrfs.hasReferences(tableDescriptor)) {
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ if(newcount > count) {
break;
}
Thread.sleep(50);
}
- int newcount = 0;
- for(HColumnDescriptor colFamily : columnFamilies) {
- newcount += hrfs.getStoreFiles(colFamily.getName()).size();
- }
assertTrue(newcount > count);
- // clean up the merged region store files
- List regions =
- TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
- for (HRegion region : regions) {
- CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
+ List regionServerThreads = TEST_UTIL.getHBaseCluster()
+ .getRegionServerThreads();
+ for (RegionServerThread rs : regionServerThreads) {
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
+ rs.getRegionServer(), false);
cleaner.chore();
+ Thread.sleep(1000);
+ }
+ int newcount1 = 0;
+ while (System.currentTimeMillis() < timeout) {
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ if(newcount1 <= 1) {
+ break;
+ }
+ Thread.sleep(50);
}
// run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 67258aa5367..99f580140f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,9 +48,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
@@ -454,8 +455,18 @@ public class TestRegionReplicas {
LOG.info("Force Major compaction on primary region " + hriPrimary);
primaryRegion.compact(true);
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
+ List regionServerThreads = HTU.getMiniHBaseCluster()
+ .getRegionServerThreads();
+ HRegionServer hrs = null;
+ for (RegionServerThread rs : regionServerThreads) {
+ if (rs.getRegionServer()
+ .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
+ hrs = rs.getRegionServer();
+ break;
+ }
+ }
CompactedHFilesDischarger cleaner =
- new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion);
+ new CompactedHFilesDischarger(100, null, hrs, false);
cleaner.chore();
// scan all the hfiles on the secondary.
// since there are no read on the secondary when we ask locations to
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
index 40539c45c3f..c23e7942578 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,10 +40,12 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -62,6 +66,7 @@ public class TestCompactedHFilesDischarger {
private static CountDownLatch latch = new CountDownLatch(3);
private static AtomicInteger counter = new AtomicInteger(0);
private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
+ private RegionServerServices rss;
@Before
public void setUp() throws Exception {
@@ -71,6 +76,10 @@ public class TestCompactedHFilesDischarger {
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
+ rss = mock(RegionServerServices.class);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
}
@After
@@ -86,7 +95,7 @@ public class TestCompactedHFilesDischarger {
public void testCompactedHFilesCleaner() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
- new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@@ -152,7 +161,7 @@ public class TestCompactedHFilesDischarger {
public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
- new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@@ -223,7 +232,7 @@ public class TestCompactedHFilesDischarger {
public void testCleanerWithParallelScanners() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
- new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));