HBASE-15027 Refactor the way the CompactedHFileDischarger threads are

created (Ram)
This commit is contained in:
ramkrishna 2016-01-08 11:18:39 +05:30
parent f3ee6df0f2
commit 28c2b18d30
19 changed files with 374 additions and 221 deletions

View File

@ -265,7 +265,14 @@ public enum EventType {
*
* RS_REGION_REPLICA_FLUSH
*/
RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
/**
* RS compacted files discharger <br>
*
* RS_COMPACTED_FILES_DISCHARGER
*/
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
private final int code;
private final ExecutorType executor;

View File

@ -46,7 +46,8 @@ public enum ExecutorType {
RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28);
RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29);
ExecutorType(int value) {}

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -78,7 +79,8 @@ public class ExecutorService {
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
*/
void startExecutorService(String name, int maxThreads) {
@VisibleForTesting
public void startExecutorService(String name, int maxThreads) {
if (this.executorMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running!");

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HStore;
/**
* Event handler that handles the removal and archival of the compacted hfiles
*/
@InterfaceAudience.Private
public class CompactedHFilesDischargeHandler extends EventHandler {
private HStore store;
public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) {
super(server, eventType);
this.store = store;
}
@Override
public void process() throws IOException {
this.store.closeAndArchiveCompactedFiles();
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import com.google.common.annotations.VisibleForTesting;
/**
* A chore service that periodically cleans up the compacted files when there are no active readers
* using those compacted files and also helps in clearing the block cache with these compacted
* file entries
*/
@InterfaceAudience.Private
public class CompactedHFilesDischarger extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
private RegionServerServices regionServerServices;
// Default is to use executor
@VisibleForTesting
private boolean useExecutor = true;
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param regionServerServices the region server that starts this chore
*/
public CompactedHFilesDischarger(final int period, final Stoppable stopper,
final RegionServerServices regionServerServices) {
// Need to add the config classes
super("CompactedHFilesCleaner", stopper, period);
this.regionServerServices = regionServerServices;
}
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param regionServerServices the region server that starts this chore
* @param useExecutor true if to use the region server's executor service, false otherwise
*/
@VisibleForTesting
public CompactedHFilesDischarger(final int period, final Stoppable stopper,
final RegionServerServices regionServerServices, boolean useExecutor) {
// Need to add the config classes
this(period, stopper, regionServerServices);
this.useExecutor = useExecutor;
}
@Override
public void chore() {
List<Region> onlineRegions = regionServerServices.getOnlineRegions();
if (onlineRegions != null) {
for (Region region : onlineRegions) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
for (Store store : region.getStores()) {
try {
if (useExecutor && regionServerServices != null) {
CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
(Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
(HStore) store);
regionServerServices.getExecutorService().submit(handler);
} else {
// call synchronously if the RegionServerServices are not
// available
store.closeAndArchiveCompactedFiles();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Completed archiving the compacted files for the region "
+ region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
}
} catch (Exception e) {
LOG.error("Exception while trying to close and archive the comapcted store "
+ "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ region.getRegionInfo(), e);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
}
}
}
}

View File

@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.DroppedSnapshotException;
@ -151,7 +150,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
@ -814,20 +812,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Initialize all the HStores
status.setStatus("Initializing all the Stores");
long maxSeqId = initializeStores(reporter, status);
// Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
// that will no longer be used in reads.
if (this.getRegionServerServices() != null) {
ChoreService choreService = this.getRegionServerServices().getChoreService();
if (choreService != null) {
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
// 2 mins so that compacted files can be archived before the TTLCleaner runs
int cleanerInterval =
conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
this.compactedFileDischarger =
new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
choreService.scheduleChore(compactedFileDischarger);
}
}
this.mvcc.advanceTo(maxSeqId);
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
// Recover any edits if available.

View File

@ -134,6 +134,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@ -484,6 +485,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
*/
protected final ConfigurationManager configurationManager;
private CompactedHFilesDischarger compactedFileDischarger;
/**
* Starts a HRegionServer at the default location.
* @param conf
@ -615,6 +618,16 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
}
});
}
// Create the CompactedFileDischarger chore service. This chore helps to
// remove the compacted files
// that will no longer be used in reads.
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
// 2 mins so that compacted files can be archived before the TTLCleaner runs
int cleanerInterval =
conf.getInt("hbase.hfile.compaction.discharger.interval", 2 * 60 * 1000);
this.compactedFileDischarger =
new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
choreService.scheduleChore(compactedFileDischarger);
}
protected TableDescriptors getFsTableDescriptors() throws IOException {
@ -1716,7 +1729,9 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
// Start the threads for compacted files discharger
this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
@ -2725,6 +2740,15 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
return tableRegions;
}
@Override
public List<Region> getOnlineRegions() {
List<Region> allRegions = new ArrayList<Region>();
synchronized (this.onlineRegions) {
// Return a clone copy of the onlineRegions
allRegions.addAll(onlineRegions.values());
}
return allRegions;
}
/**
* Gets the online tables in this RS.
* This method looks at the in-memory onlineRegions.

View File

@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -91,7 +88,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@ -139,8 +135,6 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
private CompletionService<StoreFile> completionService = null;
/**
* RWLock for store operations.
@ -274,10 +268,6 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber);
}
compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
completionService =
new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor);
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
}
@ -802,7 +792,9 @@ public class HStore implements Store {
Collection<StoreFile> compactedfiles =
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
removeCompactedFiles(compactedfiles);
if (compactedfiles != null && !compactedfiles.isEmpty()) {
removeCompactedfiles(compactedfiles);
}
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
@ -844,9 +836,6 @@ public class HStore implements Store {
}
if (ioe != null) throw ioe;
}
if (compactionCleanerthreadPoolExecutor != null) {
compactionCleanerthreadPoolExecutor.shutdownNow();
}
LOG.info("Closed " + this);
return result;
} finally {
@ -2174,7 +2163,7 @@ public class HStore implements Store {
}
public static final long FIXED_OVERHEAD =
ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@ -2311,92 +2300,72 @@ public class HStore implements Store {
} finally {
lock.readLock().unlock();
}
removeCompactedFiles(copyCompactedfiles);
}
private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "CompactedfilesArchiver-" + count++);
}
});
}
private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException {
if (compactedfiles != null && !compactedfiles.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing the compacted store files " + compactedfiles);
}
for (final StoreFile file : compactedfiles) {
completionService.submit(new Callable<StoreFile>() {
@Override
public StoreFile call() throws IOException {
synchronized (file) {
try {
StoreFile.Reader r = file.getReader();
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");
}
return file;
}
if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
LOG.trace("Closing and archiving the file " + file.getPath());
}
r.close(true);
// Just close and return
return file;
}
} catch (Exception e) {
LOG.error("Exception while trying to close the compacted store file "
+ file.getPath().getName());
}
}
return null;
}
});
}
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
try {
for (final StoreFile file : compactedfiles) {
Future<StoreFile> future = completionService.take();
StoreFile closedFile = future.get();
if (closedFile != null) {
filesToRemove.add(closedFile);
}
}
} catch (InterruptedException ie) {
LOG.error("Interrupted exception while closing the compacted files", ie);
} catch (Exception e) {
LOG.error("Exception occured while closing the compacted files", e);
}
if (isPrimaryReplicaStore()) {
archiveAndRemoveCompactedFiles(filesToRemove);
}
if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
removeCompactedfiles(copyCompactedfiles);
}
}
private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException {
if (!filesToArchive.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Moving the files " + filesToArchive + " to archive");
/**
* Archives and removes the compacted files
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
throws IOException {
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
for (final StoreFile file : compactedfiles) {
synchronized (file) {
try {
StoreFile.Reader r = file.getReader();
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");
}
filesToRemove.add(file);
}
if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
LOG.trace("Closing and archiving the file " + file.getPath());
}
r.close(true);
// Just close and return
filesToRemove.add(file);
}
} catch (Exception e) {
LOG.error(
"Exception while trying to close the compacted store file " + file.getPath().getName());
}
}
// Only if this is successful it has to be removed
this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
try {
lock.writeLock().lock();
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
} finally {
lock.writeLock().unlock();
}
if (this.isPrimaryReplicaStore()) {
// Only the primary region is allowed to move the file to archive.
// The secondary region does not move the files to archive. Any active reads from
// the secondary region will still work because the file as such has active readers on it.
if (!filesToRemove.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Moving the files " + filesToRemove + " to archive");
}
// Only if this is successful it has to be removed
this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
}
}
if (!filesToRemove.isEmpty()) {
// Clear the compactedfiles from the store file manager
clearCompactedfiles(filesToRemove);
}
}
private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
}
try {
lock.writeLock().lock();
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
} finally {
lock.writeLock().unlock();
}
}
}

View File

@ -67,4 +67,10 @@ public interface OnlineRegions extends Server {
* @throws java.io.IOException
*/
List<Region> getOnlineRegions(TableName tableName) throws IOException;
/**
* Get all online regions in this RS.
* @return List of online Region
*/
List<Region> getOnlineRegions();
}

View File

@ -1,74 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
/**
* A chore service that periodically cleans up the compacted files when there are no active readers
* using those compacted files and also helps in clearing the block cache with these compacted
* file entries
*/
@InterfaceAudience.Private
public class CompactedHFilesDischarger extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
private Region region;
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param region the store to identify the family name
*/
public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
// Need to add the config classes
super("CompactedHFilesCleaner", stopper, period);
this.region = region;
}
@Override
public void chore() {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
}
for (Store store : region.getStores()) {
try {
store.closeAndArchiveCompactedFiles();
if (LOG.isTraceEnabled()) {
LOG.trace("Completed archiving the compacted files for the region "
+ this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
}
} catch (Exception e) {
LOG.error(
"Exception while trying to close and archive the comapcted store files of the store "
+ store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
e);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
}
}
}

View File

@ -64,6 +64,9 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
"hbase.hstore.min.locality.to.skip.major.compact";
public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT =
"hbase.hfile.compaction.discharger.thread.count";
Configuration conf;
StoreConfigInformation storeConfigInfo;

View File

@ -109,6 +109,11 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
@Override
public List<Region> getOnlineRegions() {
return null;
}
@Override
public void addToOnlineRegions(Region r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r);

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -42,10 +44,11 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -79,6 +82,7 @@ public class TestZooKeeperTableArchiveClient {
private static ZKTableArchiveClient archivingClient;
private final List<Path> toCleanup = new ArrayList<Path>();
private static ClusterConnection CONNECTION;
private static RegionServerServices rss;
/**
* Setup the config for the cluster
@ -93,6 +97,7 @@ public class TestZooKeeperTableArchiveClient {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
}
private static void setupConf(Configuration conf) {
@ -173,8 +178,11 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
new CompactedHFilesDischarger(100, stop, region);
new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// get the current hfiles in the archive directory
@ -223,15 +231,21 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
new CompactedHFilesDischarger(100, stop, region);
new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// create the another table that we don't archive
hcd = new HColumnDescriptor(TEST_FAM);
HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
final CompactedHFilesDischarger compactionCleaner1 =
new CompactedHFilesDischarger(100, stop, otherRegion);
regions = new ArrayList<Region>();
regions.add(otherRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop,
rss, false);
loadFlushAndCompact(otherRegion, TEST_FAM);
compactionCleaner1.chore();
// get the current hfiles in the archive directory

View File

@ -461,6 +461,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
@Override
public List<Region> getOnlineRegions() {
return null;
}
@Override
public OpenRegionResponse openRegion(RpcController controller,
OpenRegionRequest request) throws ServiceException {

View File

@ -46,11 +46,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -324,10 +324,17 @@ public class TestSnapshotFromMaster {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 2 so will compact and archive
}
for (HRegion region : regions) {
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
cleaner.chore();
List<RegionServerThread> regionServerThreads = UTIL.getMiniHBaseCluster()
.getRegionServerThreads();
HRegionServer hrs = null;
for (RegionServerThread rs : regionServerThreads) {
if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) {
hrs = rs.getRegionServer();
break;
}
}
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
cleaner.chore();
LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG);

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -69,7 +70,6 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -167,9 +167,17 @@ public class TestHRegionReplayEvents {
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
when(rss.getConfiguration()).thenReturn(CONF);
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
.toString();
ExecutorService es = new ExecutorService(string);
es.startExecutorService(
string+"-"+string, 1);
when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close();
List<Region> regions = new ArrayList<Region>();
regions.add(primaryRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@ -1370,7 +1378,10 @@ public class TestHRegionReplayEvents {
// Test case 3: compact primary files
primaryRegion.compactStores();
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
List<Region> regions = new ArrayList<Region>();
regions.add(primaryRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
cleaner.chore();
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),

View File

@ -62,12 +62,12 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.util.StringUtils;
@ -246,25 +246,37 @@ public class TestRegionMergeTransactionOnCluster {
count += hrfs.getStoreFiles(colFamily.getName()).size();
}
admin.compactRegion(mergedRegionInfo.getRegionName());
// wait until merged region doesn't have reference file
// clean up the merged region store files
// wait until merged region have reference file
long timeout = System.currentTimeMillis() + waitTime;
int newcount = 0;
while (System.currentTimeMillis() < timeout) {
if (!hrfs.hasReferences(tableDescriptor)) {
for(HColumnDescriptor colFamily : columnFamilies) {
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount > count) {
break;
}
Thread.sleep(50);
}
int newcount = 0;
for(HColumnDescriptor colFamily : columnFamilies) {
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
}
assertTrue(newcount > count);
// clean up the merged region store files
List<HRegion> regions =
TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
for (HRegion region : regions) {
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
.getRegionServerThreads();
for (RegionServerThread rs : regionServerThreads) {
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
rs.getRegionServer(), false);
cleaner.chore();
Thread.sleep(1000);
}
int newcount1 = 0;
while (System.currentTimeMillis() < timeout) {
for(HColumnDescriptor colFamily : columnFamilies) {
newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount1 <= 1) {
break;
}
Thread.sleep(50);
}
// run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -47,9 +48,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
@ -454,8 +455,18 @@ public class TestRegionReplicas {
LOG.info("Force Major compaction on primary region " + hriPrimary);
primaryRegion.compact(true);
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
.getRegionServerThreads();
HRegionServer hrs = null;
for (RegionServerThread rs : regionServerThreads) {
if (rs.getRegionServer()
.getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
hrs = rs.getRegionServer();
break;
}
}
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion);
new CompactedHFilesDischarger(100, null, hrs, false);
cleaner.chore();
// scan all the hfiles on the secondary.
// since there are no read on the secondary when we ask locations to

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -38,10 +40,12 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -62,6 +66,7 @@ public class TestCompactedHFilesDischarger {
private static CountDownLatch latch = new CountDownLatch(3);
private static AtomicInteger counter = new AtomicInteger(0);
private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
private RegionServerServices rss;
@Before
public void setUp() throws Exception {
@ -71,6 +76,10 @@ public class TestCompactedHFilesDischarger {
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
rss = mock(RegionServerServices.class);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
}
@After
@ -86,7 +95,7 @@ public class TestCompactedHFilesDischarger {
public void testCompactedHFilesCleaner() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@ -152,7 +161,7 @@ public class TestCompactedHFilesDischarger {
public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@ -223,7 +232,7 @@ public class TestCompactedHFilesDischarger {
public void testCleanerWithParallelScanners() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));