HBASE-14970 Backport HBASE-13082 and its sub-jira to branch-1 (Ram)

This commit is contained in:
ramkrishna 2016-01-21 21:22:40 +05:30
parent cea43788fe
commit 58521869b0
36 changed files with 1327 additions and 181 deletions

View File

@ -265,7 +265,14 @@ public enum EventType {
* *
* RS_REGION_REPLICA_FLUSH * RS_REGION_REPLICA_FLUSH
*/ */
RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS); RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
/**
* RS compacted files discharger <br>
*
* RS_COMPACTED_FILES_DISCHARGER
*/
RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
private final int code; private final int code;
private final ExecutorType executor; private final ExecutorType executor;

View File

@ -46,7 +46,8 @@ public enum ExecutorType {
RS_CLOSE_META (25), RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26), RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27), RS_LOG_REPLAY_OPS (27),
RS_REGION_REPLICA_FLUSH_OPS (28); RS_REGION_REPLICA_FLUSH_OPS (28),
RS_COMPACTED_FILES_DISCHARGER (29);
ExecutorType(int value) {} ExecutorType(int value) {}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener; import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -86,7 +87,8 @@ public class ExecutorService {
* started with the same name, this throws a RuntimeException. * started with the same name, this throws a RuntimeException.
* @param name Name of the service to start. * @param name Name of the service to start.
*/ */
void startExecutorService(String name, int maxThreads) { @VisibleForTesting
public void startExecutorService(String name, int maxThreads) {
if (this.executorMap.get(name) != null) { if (this.executorMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name + throw new RuntimeException("An executor service with the name " + name +
" is already running!"); " is already running!");

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -33,5 +34,5 @@ public interface ChangedReadersObserver {
* Notify observers. * Notify observers.
* @throws IOException e * @throws IOException e
*/ */
void updateReaders() throws IOException; void updateReaders(List<StoreFile> sfs) throws IOException;
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HStore;
/**
* Event handler that handles the removal and archival of the compacted hfiles
*/
@InterfaceAudience.Private
public class CompactedHFilesDischargeHandler extends EventHandler {
private HStore store;
public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) {
super(server, eventType);
this.store = store;
}
@Override
public void process() throws IOException {
this.store.closeAndArchiveCompactedFiles();
}
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventType;
import com.google.common.annotations.VisibleForTesting;
/**
* A chore service that periodically cleans up the compacted files when there are no active readers
* using those compacted files and also helps in clearing the block cache with these compacted
* file entries
*/
@InterfaceAudience.Private
public class CompactedHFilesDischarger extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
private RegionServerServices regionServerServices;
// Default is to use executor
@VisibleForTesting
private boolean useExecutor = true;
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param regionServerServices the region server that starts this chore
*/
public CompactedHFilesDischarger(final int period, final Stoppable stopper,
final RegionServerServices regionServerServices) {
// Need to add the config classes
super("CompactedHFilesCleaner", stopper, period);
this.regionServerServices = regionServerServices;
}
/**
* @param period the period of time to sleep between each run
* @param stopper the stopper
* @param regionServerServices the region server that starts this chore
* @param useExecutor true if to use the region server's executor service, false otherwise
*/
@VisibleForTesting
public CompactedHFilesDischarger(final int period, final Stoppable stopper,
final RegionServerServices regionServerServices, boolean useExecutor) {
// Need to add the config classes
this(period, stopper, regionServerServices);
this.useExecutor = useExecutor;
}
@Override
public void chore() {
if (regionServerServices == null) return;
List<Region> onlineRegions = regionServerServices.getOnlineRegions();
if (onlineRegions != null) {
for (Region region : onlineRegions) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
for (Store store : region.getStores()) {
try {
if (useExecutor && regionServerServices != null) {
CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
(Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
(HStore) store);
regionServerServices.getExecutorService().submit(handler);
} else {
// call synchronously if the RegionServerServices are not
// available
store.closeAndArchiveCompactedFiles();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Completed archiving the compacted files for the region "
+ region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
}
} catch (Exception e) {
LOG.error("Exception while trying to close and archive the comapcted store "
+ "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ region.getRegionInfo(), e);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace(
"Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
}
}
}
}
}

View File

@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager {
* is atomically replaced when its contents change. * is atomically replaced when its contents change.
*/ */
private volatile ImmutableList<StoreFile> storefiles = null; private volatile ImmutableList<StoreFile> storefiles = null;
/**
* List of compacted files inside this store that needs to be excluded in reads
* because further new reads will be using only the newly created files out of compaction.
* These compacted files will be deleted/cleared once all the existing readers on these
* compacted files are done.
*/
private volatile List<StoreFile> compactedfiles = null;
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf, public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf,
CompactionConfiguration comConf) { CompactionConfiguration comConf) {
@ -74,6 +81,11 @@ class DefaultStoreFileManager implements StoreFileManager {
return storefiles; return storefiles;
} }
@Override
public Collection<StoreFile> getCompactedfiles() {
return compactedfiles;
}
@Override @Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles); ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
@ -88,6 +100,13 @@ class DefaultStoreFileManager implements StoreFileManager {
return result; return result;
} }
@Override
public Collection<StoreFile> clearCompactedFiles() {
List<StoreFile> result = compactedfiles;
compactedfiles = new ArrayList<StoreFile>();
return result;
}
@Override @Override
public final int getStorefileCount() { public final int getStorefileCount() {
return storefiles.size(); return storefiles.size();
@ -95,13 +114,42 @@ class DefaultStoreFileManager implements StoreFileManager {
@Override @Override
public void addCompactionResults( public void addCompactionResults(
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) { Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles); ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
newStoreFiles.removeAll(compactedFiles); newStoreFiles.removeAll(newCompactedfiles);
if (!results.isEmpty()) { if (!results.isEmpty()) {
newStoreFiles.addAll(results); newStoreFiles.addAll(results);
} }
sortAndSetStoreFiles(newStoreFiles); sortAndSetStoreFiles(newStoreFiles);
ArrayList<StoreFile> updatedCompactedfiles = null;
if (this.compactedfiles != null) {
updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
updatedCompactedfiles.addAll(newCompactedfiles);
} else {
updatedCompactedfiles = new ArrayList<StoreFile>(newCompactedfiles);
}
markCompactedAway(newCompactedfiles);
this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
}
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
private void markCompactedAway(Collection<StoreFile> compactedFiles) {
for (StoreFile file : compactedFiles) {
file.markCompactedAway();
}
}
@Override
public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException {
ArrayList<StoreFile> updatedCompactedfiles = null;
if (this.compactedfiles != null) {
updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
updatedCompactedfiles.removeAll(removedCompactedfiles);
this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
}
} }
@Override @Override
@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager {
storefiles = ImmutableList.copyOf(storeFiles); storefiles = ImmutableList.copyOf(storeFiles);
} }
private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
// Sorting may not be really needed here for the compacted files?
Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
return new ArrayList<StoreFile>(storefiles);
}
@Override @Override
public double getCompactionPressure() { public double getCompactionPressure() {
int storefileCount = getStorefileCount(); int storefileCount = getStorefileCount();

View File

@ -1517,6 +1517,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.metricsRegionWrapper != null) { if (this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper); Closeables.closeQuietly(this.metricsRegionWrapper);
} }
status.markComplete("Closed"); status.markComplete("Closed");
LOG.info("Closed " + this); LOG.info("Closed " + this);
return result; return result;
@ -6790,6 +6791,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dstRegion.getRegionFileSystem().logFileSystemState(LOG); dstRegion.getRegionFileSystem().logFileSystemState(LOG);
} }
// clear the compacted files if any
for (Store s : dstRegion.getStores()) {
s.closeAndArchiveCompactedFiles();
}
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) { if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?"); + " still has references after the compaction, is compaction canceled?");

View File

@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@ -479,6 +480,8 @@ public class HRegionServer extends HasThread implements
*/ */
protected final ConfigurationManager configurationManager; protected final ConfigurationManager configurationManager;
private CompactedHFilesDischarger compactedFileDischarger;
/** /**
* Starts a HRegionServer at the default location. * Starts a HRegionServer at the default location.
* @param conf * @param conf
@ -611,6 +614,16 @@ public class HRegionServer extends HasThread implements
} }
}); });
} }
// Create the CompactedFileDischarger chore service. This chore helps to
// remove the compacted files
// that will no longer be used in reads.
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
// 2 mins so that compacted files can be archived before the TTLCleaner runs
int cleanerInterval = conf
.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL, 2 * 60 * 1000);
this.compactedFileDischarger =
new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
choreService.scheduleChore(compactedFileDischarger);
} }
/* /*
@ -1708,7 +1721,9 @@ public class HRegionServer extends HasThread implements
} }
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
// Start the threads for compacted files discharger
this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads", conf.getInt("hbase.regionserver.region.replica.flusher.threads",
@ -2732,6 +2747,15 @@ public class HRegionServer extends HasThread implements
return coprocessors.toArray(new String[coprocessors.size()]); return coprocessors.toArray(new String[coprocessors.size()]);
} }
@Override
public List<Region> getOnlineRegions() {
List<Region> allRegions = new ArrayList<Region>();
synchronized (this.onlineRegions) {
// Return a clone copy of the onlineRegions
allRegions.addAll(onlineRegions.values());
}
return allRegions;
}
/** /**
* Try to close the region, logs a warning on failure but continues. * Try to close the region, logs a warning on failure but continues.
* @param region Region to close * @param region Region to close

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagType;
@ -273,7 +272,6 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not " "hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber); + flushRetriesNumber);
} }
// Crypto context for new store files // Crypto context for new store files
String cipherName = family.getEncryptionType(); String cipherName = family.getEncryptionType();
if (cipherName != null) { if (cipherName != null) {
@ -531,14 +529,15 @@ public class HStore implements Store {
try { try {
Future<StoreFile> future = completionService.take(); Future<StoreFile> future = completionService.take();
StoreFile storeFile = future.get(); StoreFile storeFile = future.get();
long length = storeFile.getReader().length(); if (storeFile != null) {
this.storeSize += length; long length = storeFile.getReader().length();
this.totalUncompressedBytes += this.storeSize += length;
storeFile.getReader().getTotalUncompressedBytes(); this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("loaded " + storeFile.toStringDetailed()); LOG.debug("loaded " + storeFile.toStringDetailed());
}
results.add(storeFile);
} }
results.add(storeFile);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (ioe == null) ioe = new InterruptedIOException(e.getMessage()); if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) { } catch (ExecutionException e) {
@ -636,8 +635,7 @@ public class HStore implements Store {
region.getMVCC().advanceTo(this.getMaxSequenceId()); region.getMVCC().advanceTo(this.getMaxSequenceId());
} }
// notify scanners, close file readers, and recompute store size completeCompaction(toBeRemovedStoreFiles);
completeCompaction(toBeRemovedStoreFiles, false);
} }
private StoreFile createStoreFileAndReader(final Path p) throws IOException { private StoreFile createStoreFileAndReader(final Path p) throws IOException {
@ -814,7 +812,6 @@ public class HStore implements Store {
// the lock. // the lock.
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
notifyChangedReadersObservers();
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files [" String traceMessage = "BULK LOAD time,size,store size,store files ["
@ -830,7 +827,12 @@ public class HStore implements Store {
try { try {
// Clear so metrics doesn't find them. // Clear so metrics doesn't find them.
ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles(); ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
Collection<StoreFile> compactedfiles =
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
if (compactedfiles != null && !compactedfiles.isEmpty()) {
removeCompactedfiles(compactedfiles);
}
if (!result.isEmpty()) { if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel. // initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region ThreadPoolExecutor storeFileCloserThreadPool = this.region
@ -1067,10 +1069,8 @@ public class HStore implements Store {
// the lock. // the lock.
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
// notify to be called here - only in case of flushes
// Tell listeners of the change in readers. notifyChangedReadersObservers(sfs);
notifyChangedReadersObservers();
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
long totalSize = 0; long totalSize = 0;
for (StoreFile sf : sfs) { for (StoreFile sf : sfs) {
@ -1088,9 +1088,9 @@ public class HStore implements Store {
* Notify all observers that set of Readers has changed. * Notify all observers that set of Readers has changed.
* @throws IOException * @throws IOException
*/ */
private void notifyChangedReadersObservers() throws IOException { private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException {
for (ChangedReadersObserver o: this.changedReaderObservers) { for (ChangedReadersObserver o : this.changedReaderObservers) {
o.updateReaders(); o.updateReaders(sfs);
} }
} }
@ -1129,6 +1129,30 @@ public class HStore implements Store {
return scanners; return scanners;
} }
@Override
public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
List<KeyValueScanner> memStoreScanners = null;
if (includeMemstoreScanner) {
this.lock.readLock().lock();
try {
memStoreScanners = this.memstore.getScanners(readPt);
} finally {
this.lock.readLock().unlock();
}
}
List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(sfScanners.size() + 1);
scanners.addAll(sfScanners);
// Then the memstore scanners
if (memStoreScanners != null) {
scanners.addAll(memStoreScanners);
}
return scanners;
}
@Override @Override
public void addChangedReaderObserver(ChangedReadersObserver o) { public void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o); this.changedReaderObservers.add(o);
@ -1248,7 +1272,7 @@ public class HStore implements Store {
compactedCellsSize += getCompactionProgress().totalCompactedSize; compactedCellsSize += getCompactionProgress().totalCompactedSize;
} }
// At this point the store will use new files for all new scanners. // At this point the store will use new files for all new scanners.
completeCompaction(filesToCompact, true); // Archive old files & update store size. completeCompaction(filesToCompact); // update store size.
logCompactionEndMessage(cr, sfs, compactionStartTime); logCompactionEndMessage(cr, sfs, compactionStartTime);
return sfs; return sfs;
@ -1436,7 +1460,7 @@ public class HStore implements Store {
LOG.info("Replaying compaction marker, replacing input files: " + LOG.info("Replaying compaction marker, replacing input files: " +
inputStoreFiles + " with output files : " + outputStoreFiles); inputStoreFiles + " with output files : " + outputStoreFiles);
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles); this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
this.completeCompaction(inputStoreFiles, removeFiles); this.completeCompaction(inputStoreFiles);
} }
} }
@ -1488,7 +1512,7 @@ public class HStore implements Store {
this.getCoprocessorHost().postCompact(this, sf, null); this.getCoprocessorHost().postCompact(this, sf, null);
} }
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
completeCompaction(filesToCompact, true); completeCompaction(filesToCompact);
} }
} finally { } finally {
synchronized (filesCompacting) { synchronized (filesCompacting) {
@ -1771,32 +1795,7 @@ public class HStore implements Store {
@VisibleForTesting @VisibleForTesting
protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles) protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
throws IOException { throws IOException {
try { LOG.debug("Completing compaction...");
// Do not delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding scanners.
// Don't do this under writeLock; see HBASE-4485 for a possible deadlock
// scenario that could have happened if continue to hold the lock.
notifyChangedReadersObservers();
// At this point the store will use new files for all scanners.
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
boolean evictOnClose =
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
for (StoreFile compactedFile : compactedFiles) {
compactedFile.closeReader(evictOnClose);
}
if (removeFiles) {
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed removing compacted files in " + this +
". Files we were trying to remove are " + compactedFiles.toString() +
"; some of them may have been already removed", e);
}
// 4. Compute new store size
this.storeSize = 0L; this.storeSize = 0L;
this.totalUncompressedBytes = 0L; this.totalUncompressedBytes = 0L;
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
@ -2490,4 +2489,92 @@ public class HStore implements Store {
public boolean isPrimaryReplicaStore() { public boolean isPrimaryReplicaStore() {
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID; return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
} }
@Override
public void closeAndArchiveCompactedFiles() throws IOException {
lock.readLock().lock();
Collection<StoreFile> copyCompactedfiles = null;
try {
Collection<StoreFile> compactedfiles =
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
if (compactedfiles != null && compactedfiles.size() != 0) {
// Do a copy under read lock
copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("No compacted files to archive");
return;
}
}
} finally {
lock.readLock().unlock();
}
if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
removeCompactedfiles(copyCompactedfiles);
}
}
/**
* Archives and removes the compacted files
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
throws IOException {
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
for (final StoreFile file : compactedfiles) {
synchronized (file) {
try {
StoreFile.Reader r = file.getReader();
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");
}
filesToRemove.add(file);
}
if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
LOG.trace("Closing and archiving the file " + file.getPath());
}
r.close(true);
// Just close and return
filesToRemove.add(file);
}
} catch (Exception e) {
LOG.error(
"Exception while trying to close the compacted store file " + file.getPath().getName());
}
}
}
if (this.isPrimaryReplicaStore()) {
// Only the primary region is allowed to move the file to archive.
// The secondary region does not move the files to archive. Any active reads from
// the secondary region will still work because the file as such has active readers on it.
if (!filesToRemove.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Moving the files " + filesToRemove + " to archive");
}
// Only if this is successful it has to be removed
this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
}
}
if (!filesToRemove.isEmpty()) {
// Clear the compactedfiles from the store file manager
clearCompactedfiles(filesToRemove);
}
}
private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
}
try {
lock.writeLock().lock();
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
} finally {
lock.writeLock().unlock();
}
}
} }

View File

@ -67,4 +67,10 @@ public interface OnlineRegions extends Server {
* @throws java.io.IOException * @throws java.io.IOException
*/ */
List<Region> getOnlineRegions(TableName tableName) throws IOException; List<Region> getOnlineRegions(TableName tableName) throws IOException;
/**
* Get all online regions in this RS.
* @return List of online Region
*/
List<Region> getOnlineRegions();
} }

View File

@ -124,24 +124,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override @Override
public boolean seekToPreviousRow(Cell key) throws IOException { public boolean seekToPreviousRow(Cell key) throws IOException {
lock.lock(); boolean flushed = checkFlushed();
try { checkReseek(flushed);
checkReseek(); return this.heap.seekToPreviousRow(key);
return this.heap.seekToPreviousRow(key);
} finally {
lock.unlock();
}
} }
@Override @Override
public boolean backwardSeek(Cell key) throws IOException { public boolean backwardSeek(Cell key) throws IOException {
lock.lock(); boolean flushed = checkFlushed();
try { checkReseek(flushed);
checkReseek(); return this.heap.backwardSeek(key);
return this.heap.backwardSeek(key);
} finally {
lock.unlock();
}
} }
} }

View File

@ -106,6 +106,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
long readPt long readPt
) throws IOException; ) throws IOException;
/**
* Create scanners on the given files and if needed on the memstore with no filtering based on TTL
* (that happens further down the line).
* @param files the list of files on which the scanners has to be created
* @param cacheBlocks cache the blocks or not
* @param isGet true if it is get, false if not
* @param usePread true to use pread, false if not
* @param isCompaction true if the scanner is created for compaction
* @param matcher the scan query matcher
* @param startRow the start row
* @param stopRow the stop row
* @param readPt the read point of the current scan
* @param includeMemstoreScanner true if memstore has to be included
* @return scanners on the given files and on the memstore if specified
*/
List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, boolean isGet,
boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException;
ScanInfo getScanInfo(); ScanInfo getScanInfo();
/** /**
@ -480,4 +499,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException; void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
boolean isPrimaryReplicaStore(); boolean isPrimaryReplicaStore();
/**
* Closes and archives the compacted files under this store
*/
void closeAndArchiveCompactedFiles() throws IOException;
} }

View File

@ -30,6 +30,8 @@ import java.util.Map;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
@ -368,6 +371,19 @@ public class StoreFile {
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY); return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
} }
@VisibleForTesting
public boolean isCompactedAway() {
if (this.reader != null) {
return this.reader.isCompactedAway();
}
return true;
}
@VisibleForTesting
public int getRefCount() {
return this.reader.refCount.get();
}
/** /**
* Return the timestamp at which this bulk load file was generated. * Return the timestamp at which this bulk load file was generated.
*/ */
@ -536,6 +552,15 @@ public class StoreFile {
} }
} }
/**
* Marks the status of the file as compactedAway.
*/
public void markCompactedAway() {
if (this.reader != null) {
this.reader.markCompactedAway();
}
}
/** /**
* Delete this file * Delete this file
* @throws IOException * @throws IOException
@ -1072,6 +1097,12 @@ public class StoreFile {
private byte[] lastBloomKey; private byte[] lastBloomKey;
private long deleteFamilyCnt = -1; private long deleteFamilyCnt = -1;
private boolean bulkLoadResult = false; private boolean bulkLoadResult = false;
// Counter that is incremented every time a scanner is created on the
// store file. It is decremented when the scan on the store file is
// done.
private AtomicInteger refCount = new AtomicInteger(0);
// Indicates if the file got compacted
private volatile boolean compactedAway = false;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf) public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException { throws IOException {
@ -1079,6 +1110,10 @@ public class StoreFile {
bloomFilterType = BloomType.NONE; bloomFilterType = BloomType.NONE;
} }
void markCompactedAway() {
this.compactedAway = true;
}
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size, public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Configuration conf) throws IOException { CacheConfig cacheConf, Configuration conf) throws IOException {
reader = HFile.createReader(fs, path, in, size, cacheConf, conf); reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
@ -1130,11 +1165,35 @@ public class StoreFile {
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread, boolean pread,
boolean isCompaction, long readPt) { boolean isCompaction, long readPt) {
// Increment the ref count
refCount.incrementAndGet();
return new StoreFileScanner(this, return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, isCompaction), getScanner(cacheBlocks, pread, isCompaction),
!isCompaction, reader.hasMVCCInfo(), readPt); !isCompaction, reader.hasMVCCInfo(), readPt);
} }
/**
* Decrement the ref count associated with the reader when ever a scanner associated
* with the reader is closed
*/
void decrementRefCount() {
refCount.decrementAndGet();
}
/**
* @return true if the file is still used in reads
*/
public boolean isReferencedInReads() {
return refCount.get() != 0;
}
/**
* @return true if the file is compacted
*/
public boolean isCompactedAway() {
return this.compactedAway;
}
/** /**
* Warning: Do not write further code which depends on this call. Instead * Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface * use getStoreFileScanner() which uses the StoreFileScanner class/interface
@ -1620,7 +1679,13 @@ public class StoreFile {
private static class GetFileSize implements Function<StoreFile, Long> { private static class GetFileSize implements Function<StoreFile, Long> {
@Override @Override
public Long apply(StoreFile sf) { public Long apply(StoreFile sf) {
return sf.getReader().length(); if (sf.getReader() != null) {
return sf.getReader().length();
} else {
// the reader may be null for the compacted files and if the archiving
// had failed.
return -1L;
}
} }
} }

View File

@ -53,19 +53,33 @@ public interface StoreFileManager {
void insertNewFiles(Collection<StoreFile> sfs) throws IOException; void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
/** /**
* Adds compaction results into the structure. * Adds only the new compaction results into the structure.
* @param compactedFiles The input files for the compaction. * @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction. * @param results The resulting files for the compaction.
*/ */
void addCompactionResults( void addCompactionResults(
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException; Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
/**
* Remove the compacted files
* @param compactedFiles the list of compacted files
* @throws IOException
*/
void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException;
/** /**
* Clears all the files currently in use and returns them. * Clears all the files currently in use and returns them.
* @return The files previously in use. * @return The files previously in use.
*/ */
ImmutableCollection<StoreFile> clearFiles(); ImmutableCollection<StoreFile> clearFiles();
/**
* Clears all the compacted files and returns them. This method is expected to be
* accessed single threaded.
* @return The files compacted previously.
*/
Collection<StoreFile> clearCompactedFiles();
/** /**
* Gets the snapshot of the store files currently in use. Can be used for things like metrics * Gets the snapshot of the store files currently in use. Can be used for things like metrics
* and checks; should not assume anything about relations between store files in the list. * and checks; should not assume anything about relations between store files in the list.
@ -73,6 +87,15 @@ public interface StoreFileManager {
*/ */
Collection<StoreFile> getStorefiles(); Collection<StoreFile> getStorefiles();
/**
* List of compacted files inside this store that needs to be excluded in reads
* because further new reads will be using only the newly created files out of compaction.
* These compacted files will be deleted/cleared once all the existing readers on these
* compacted files are done.
* @return the list of compacted files
*/
Collection<StoreFile> getCompactedfiles();
/** /**
* Returns the number of files currently in use. * Returns the number of files currently in use.
* @return The number of files. * @return The number of files.

View File

@ -51,6 +51,7 @@ public class StoreFileScanner implements KeyValueScanner {
private final StoreFile.Reader reader; private final StoreFile.Reader reader;
private final HFileScanner hfs; private final HFileScanner hfs;
private Cell cur = null; private Cell cur = null;
private boolean closed = false;
private boolean realSeekDone; private boolean realSeekDone;
private boolean delayedReseek; private boolean delayedReseek;
@ -171,7 +172,7 @@ public class StoreFileScanner implements KeyValueScanner {
try { try {
try { try {
if(!seekAtOrAfter(hfs, key)) { if(!seekAtOrAfter(hfs, key)) {
close(); this.cur = null;
return false; return false;
} }
@ -198,7 +199,7 @@ public class StoreFileScanner implements KeyValueScanner {
try { try {
try { try {
if (!reseekAtOrAfter(hfs, key)) { if (!reseekAtOrAfter(hfs, key)) {
close(); this.cur = null;
return false; return false;
} }
setCurrentCell(hfs.getKeyValue()); setCurrentCell(hfs.getKeyValue());
@ -244,7 +245,6 @@ public class StoreFileScanner implements KeyValueScanner {
} }
if (cur == null) { if (cur == null) {
close();
return false; return false;
} }
@ -252,8 +252,12 @@ public class StoreFileScanner implements KeyValueScanner {
} }
public void close() { public void close() {
// Nothing to close on HFileScanner?
cur = null; cur = null;
if (closed) return;
if (this.reader != null) {
this.reader.decrementRefCount();
}
closed = true;
} }
/** /**
@ -454,7 +458,7 @@ public class StoreFileScanner implements KeyValueScanner {
if (seekCount != null) seekCount.incrementAndGet(); if (seekCount != null) seekCount.incrementAndGet();
if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(), if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
seekKey.getKeyLength())) { seekKey.getKeyLength())) {
close(); this.cur = null;
return false; return false;
} }
KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue() KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()

View File

@ -121,7 +121,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// A flag whether use pread for scan // A flag whether use pread for scan
private boolean scanUsePread = false; private boolean scanUsePread = false;
protected ReentrantLock lock = new ReentrantLock(); // Indicates whether there was flush during the course of the scan
private volatile boolean flushed = false;
// generally we get one file from a flush
private List<StoreFile> flushedStoreFiles = new ArrayList<StoreFile>(1);
// The current list of scanners
private List<KeyValueScanner> currentScanners = new ArrayList<KeyValueScanner>();
// flush update lock
private ReentrantLock flushLock = new ReentrantLock();
private final long readPt; private final long readPt;
@ -166,6 +173,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
} }
protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
this.currentScanners.addAll(scanners);
}
/** /**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
* are not in a compaction. * are not in a compaction.
@ -203,7 +213,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// set rowOffset // set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily(); this.storeOffset = scan.getRowOffsetPerColumnFamily();
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
} }
@ -260,7 +270,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek all scanners to the initial key // Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator()); resetKVHeap(scanners, store.getComparator());
} }
@ -299,6 +309,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
// Seek all scanners to the initial key // Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
addCurrentScanners(scanners);
resetKVHeap(scanners, scanInfo.getComparator()); resetKVHeap(scanners, scanInfo.getComparator());
} }
@ -392,6 +403,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) { if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
scanners.add(kvs); scanners.add(kvs);
} else {
kvs.close();
} }
} }
return scanners; return scanners;
@ -399,15 +412,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override @Override
public Cell peek() { public Cell peek() {
lock.lock();
try {
if (this.heap == null) { if (this.heap == null) {
return this.lastTop; return this.lastTop;
} }
return this.heap.peek(); return this.heap.peek();
} finally {
lock.unlock();
}
} }
@Override @Override
@ -418,8 +426,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override @Override
public void close() { public void close() {
lock.lock();
try {
if (this.closing) return; if (this.closing) return;
this.closing = true; this.closing = true;
// Under test, we dont have a this.store // Under test, we dont have a this.store
@ -429,21 +435,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap.close(); this.heap.close();
this.heap = null; // CLOSED! this.heap = null; // CLOSED!
this.lastTop = null; // If both are null, we are closed. this.lastTop = null; // If both are null, we are closed.
} finally {
lock.unlock();
}
} }
@Override @Override
public boolean seek(Cell key) throws IOException { public boolean seek(Cell key) throws IOException {
lock.lock(); boolean flushed = checkFlushed();
try {
// reset matcher state, in case that underlying store changed // reset matcher state, in case that underlying store changed
checkReseek(); checkReseek(flushed);
return this.heap.seek(key); return this.heap.seek(key);
} finally {
lock.unlock();
}
} }
@Override @Override
@ -459,13 +458,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/ */
@Override @Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
lock.lock();
try {
if (scannerContext == null) { if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null"); throw new IllegalArgumentException("Scanner context cannot be null");
} }
if (checkReseek()) { boolean flushed = checkFlushed();
if (checkReseek(flushed)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
} }
@ -643,9 +640,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// No more keys // No more keys
close(); close();
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
} finally {
lock.unlock();
}
} }
/* /*
@ -682,9 +676,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Implementation of ChangedReadersObserver // Implementation of ChangedReadersObserver
@Override @Override
public void updateReaders() throws IOException { public void updateReaders(List<StoreFile> sfs) throws IOException {
lock.lock(); flushed = true;
flushLock.lock();
try { try {
flushedStoreFiles.addAll(sfs);
} finally {
flushLock.unlock();
}
}
// Implementation of ChangedReadersObserver
protected void nullifyCurrentHeap() throws IOException {
if (this.closing) return; if (this.closing) return;
// All public synchronized API calls will call 'checkReseek' which will cause // All public synchronized API calls will call 'checkReseek' which will cause
@ -695,7 +698,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (this.heap == null) return; if (this.heap == null) return;
// this could be null. // this could be null.
this.lastTop = this.peek(); this.lastTop = this.heap.peek();
//DebugPrint.println("SS updateReaders, topKey = " + lastTop); //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
@ -704,18 +707,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking // Let the next() call handle re-creating and seeking
} finally {
lock.unlock();
}
} }
/** /**
* @param flushed indicates if there was a flush
* @return true if top of heap has changed (and KeyValueHeap has to try the * @return true if top of heap has changed (and KeyValueHeap has to try the
* next KV) * next KV)
* @throws IOException * @throws IOException
*/ */
protected boolean checkReseek() throws IOException { protected boolean checkReseek(boolean flushed) throws IOException {
if (this.heap == null && this.lastTop != null) { if (flushed && this.lastTop != null) {
resetScannerStack(this.lastTop); resetScannerStack(this.lastTop);
if (this.heap.peek() == null if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
@ -731,21 +732,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
} }
protected void resetScannerStack(Cell lastTopKey) throws IOException { protected void resetScannerStack(Cell lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
/* When we have the scan object, should we not pass it to getScanners() /* When we have the scan object, should we not pass it to getScanners()
* to get a limited set of scanners? We did so in the constructor and we * to get a limited set of scanners? We did so in the constructor and we
* could have done it now by storing the scan object from the constructor */ * could have done it now by storing the scan object from the constructor
List<KeyValueScanner> scanners = getScannersNoCompaction(); */
// Seek all scanners to the initial key final boolean isCompaction = false;
boolean usePread = get || scanUsePread;
List<KeyValueScanner> scanners = null;
try {
flushLock.lock();
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
// Clear the current set of flushed store files so that they don't get added again
flushedStoreFiles.clear();
} finally {
flushLock.unlock();
}
// Seek the new scanners to the last key
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled); seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
currentScanners.remove(i);
break;
}
}
// add the newly created scanners on the flushed files and the current active memstore scanner
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator()); resetKVHeap(this.currentScanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row. // Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the // Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row. // query matcher if scanning intra-row.
@ -796,19 +813,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override @Override
public boolean reseek(Cell kv) throws IOException { public boolean reseek(Cell kv) throws IOException {
lock.lock(); boolean flushed = checkFlushed();
try { // Heap will not be null, if this is called from next() which.
//Heap will not be null, if this is called from next() which. // If called from RegionScanner.reseek(...) make sure the scanner
//If called from RegionScanner.reseek(...) make sure the scanner // stack is reset if needed.
//stack is reset if needed. checkReseek(flushed);
checkReseek();
if (explicitColumnQuery && lazySeekEnabledGlobally) { if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom); return heap.requestSeek(kv, true, useRowColBloom);
} }
return heap.reseek(kv); return heap.reseek(kv);
} finally { }
lock.unlock();
protected boolean checkFlushed() {
// check the var without any lock. Suppose even if we see the old
// value here still it is ok to continue because we will not be resetting
// the heap but will continue with the referenced memstore's snapshot. For compactions
// any way we don't need the updateReaders at all to happen as we still continue with
// the older files
if (flushed) {
// If there is a flush and the current scan is notified on the flush ensure that the
// scan's heap gets reset and we do a seek on the newly flushed file.
if(!this.closing) {
this.lastTop = this.peek();
} else {
return false;
}
// reset the flag
flushed = false;
return true;
} }
return false;
} }
@Override @Override

View File

@ -104,6 +104,7 @@ public class StripeStoreFileManager
/** Cached list of all files in the structure, to return from some calls */ /** Cached list of all files in the structure, to return from some calls */
public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of(); public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
private ImmutableList<StoreFile> allCompactedFilesCached = ImmutableList.<StoreFile>of();
} }
private State state = null; private State state = null;
@ -138,9 +139,15 @@ public class StripeStoreFileManager
return state.allFilesCached; return state.allFilesCached;
} }
@Override
public Collection<StoreFile> getCompactedfiles() {
return state.allCompactedFilesCached;
}
@Override @Override
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
// Passing null does not cause NPE??
cmc.mergeResults(null, sfs); cmc.mergeResults(null, sfs);
debugDumpState("Added new files"); debugDumpState("Added new files");
} }
@ -154,6 +161,13 @@ public class StripeStoreFileManager
return result; return result;
} }
@Override
public ImmutableCollection<StoreFile> clearCompactedFiles() {
ImmutableCollection<StoreFile> result = state.allCompactedFilesCached;
this.state = new State();
return result;
}
@Override @Override
public int getStorefileCount() { public int getStorefileCount() {
return state.allFilesCached.size(); return state.allFilesCached.size();
@ -304,9 +318,31 @@ public class StripeStoreFileManager
// copies and apply the result at the end. // copies and apply the result at the end.
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.mergeResults(compactedFiles, results); cmc.mergeResults(compactedFiles, results);
markCompactedAway(compactedFiles);
debugDumpState("Merged compaction results"); debugDumpState("Merged compaction results");
} }
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
private void markCompactedAway(Collection<StoreFile> compactedFiles) {
for (StoreFile file : compactedFiles) {
file.markCompactedAway();
}
}
@Override
public void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException {
// See class comment for the assumptions we make here.
LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
// In order to be able to fail in the middle of the operation, we'll operate on lazy
// copies and apply the result at the end.
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.deleteResults(compactedFiles);
debugDumpState("Deleted compaction results");
}
@Override @Override
public int getStoreCompactionPriority() { public int getStoreCompactionPriority() {
// If there's only L0, do what the default store does. // If there's only L0, do what the default store does.
@ -660,7 +696,7 @@ public class StripeStoreFileManager
this.isFlush = isFlush; this.isFlush = isFlush;
} }
public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results) private void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
throws IOException { throws IOException {
assert this.compactedFiles == null && this.results == null; assert this.compactedFiles == null && this.results == null;
this.compactedFiles = compactedFiles; this.compactedFiles = compactedFiles;
@ -672,12 +708,20 @@ public class StripeStoreFileManager
processNewCandidateStripes(newStripes); processNewCandidateStripes(newStripes);
} }
// Create new state and update parent. // Create new state and update parent.
State state = createNewState(); State state = createNewState(false);
StripeStoreFileManager.this.state = state; StripeStoreFileManager.this.state = state;
updateMetadataMaps(); updateMetadataMaps();
} }
private State createNewState() { private void deleteResults(Collection<StoreFile> compactedFiles) throws IOException {
this.compactedFiles = compactedFiles;
// Create new state and update parent.
State state = createNewState(true);
StripeStoreFileManager.this.state = state;
updateMetadataMaps();
}
private State createNewState(boolean delCompactedFiles) {
State oldState = StripeStoreFileManager.this.state; State oldState = StripeStoreFileManager.this.state;
// Stripe count should be the same unless the end rows changed. // Stripe count should be the same unless the end rows changed.
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null; assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
@ -693,9 +737,21 @@ public class StripeStoreFileManager
} }
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached); List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
if (!isFlush) newAllFiles.removeAll(compactedFiles); List<StoreFile> newAllCompactedFiles =
newAllFiles.addAll(results); new ArrayList<StoreFile>(oldState.allCompactedFilesCached);
if (!isFlush) {
newAllFiles.removeAll(compactedFiles);
if (delCompactedFiles) {
newAllCompactedFiles.removeAll(compactedFiles);
} else {
newAllCompactedFiles.addAll(compactedFiles);
}
}
if (results != null) {
newAllFiles.addAll(results);
}
newState.allFilesCached = ImmutableList.copyOf(newAllFiles); newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
return newState; return newState;
} }
@ -946,14 +1002,16 @@ public class StripeStoreFileManager
// Order by seqnum is reversed. // Order by seqnum is reversed.
for (int i = 1; i < stripe.size(); ++i) { for (int i = 1; i < stripe.size(); ++i) {
StoreFile sf = stripe.get(i); StoreFile sf = stripe.get(i);
long fileTs = sf.getReader().getMaxTimestamp(); synchronized (sf) {
if (fileTs < maxTs && !filesCompacting.contains(sf)) { long fileTs = sf.getReader().getMaxTimestamp();
LOG.info("Found an expired store file: " + sf.getPath() if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+ " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is "
if (expiredStoreFiles == null) { + fileTs + ", which is below " + maxTs);
expiredStoreFiles = new ArrayList<StoreFile>(); if (expiredStoreFiles == null) {
expiredStoreFiles = new ArrayList<StoreFile>();
}
expiredStoreFiles.add(sf);
} }
expiredStoreFiles.add(sf);
} }
} }
return expiredStoreFiles; return expiredStoreFiles;

View File

@ -55,6 +55,8 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY = public static final String HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY =
"hbase.hstore.compaction.min.size"; "hbase.hstore.compaction.min.size";
public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max"; public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max";
public static final String HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT =
"hbase.hstore.compaction.discharger.thread.count";
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY = public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY =
"hbase.hstore.compaction.max.size"; "hbase.hstore.compaction.max.size";
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_OFFPEAK_KEY = public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_OFFPEAK_KEY =
@ -64,6 +66,11 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
"hbase.hstore.min.locality.to.skip.major.compact"; "hbase.hstore.min.locality.to.skip.major.compact";
public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT =
"hbase.hfile.compaction.discharger.thread.count";
public static final String HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL =
"hbase.hfile.compaction.discharger.interval";
Configuration conf; Configuration conf;
StoreConfigInformation storeConfigInfo; StoreConfigInformation storeConfigInfo;

View File

@ -108,6 +108,11 @@ public class MockRegionServerServices implements RegionServerServices {
return null; return null;
} }
@Override
public List<Region> getOnlineRegions() {
return null;
}
@Override @Override
public void addToOnlineRegions(Region r) { public void addToOnlineRegions(Region r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r); this.regions.put(r.getRegionInfo().getEncodedName(), r);

View File

@ -196,17 +196,6 @@ public class TestIOFencing {
r = (CompactionBlockerRegion) region; r = (CompactionBlockerRegion) region;
} }
@Override
protected void completeCompaction(final Collection<StoreFile> compactedFiles,
boolean removeFiles) throws IOException {
try {
r.compactionsWaiting.countDown();
r.compactionsBlocked.await();
} catch (InterruptedException ex) {
throw new IOException(ex);
}
super.completeCompaction(compactedFiles, removeFiles);
}
@Override @Override
protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException { protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
try { try {

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -43,7 +45,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -76,6 +81,7 @@ public class TestZooKeeperTableArchiveClient {
private static ZKTableArchiveClient archivingClient; private static ZKTableArchiveClient archivingClient;
private final List<Path> toCleanup = new ArrayList<Path>(); private final List<Path> toCleanup = new ArrayList<Path>();
private static ClusterConnection CONNECTION; private static ClusterConnection CONNECTION;
private static RegionServerServices rss;
/** /**
* Setup the config for the cluster * Setup the config for the cluster
@ -90,6 +96,7 @@ public class TestZooKeeperTableArchiveClient {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher); String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode); ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
} }
private static void setupConf(Configuration conf) { private static void setupConf(Configuration conf) {
@ -169,10 +176,14 @@ public class TestZooKeeperTableArchiveClient {
// create the region // create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM); loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// get the current hfiles in the archive directory // get the current hfiles in the archive directory
List<Path> files = getAllFiles(fs, archiveDir); List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) { if (files == null) {
@ -216,18 +227,28 @@ public class TestZooKeeperTableArchiveClient {
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
// create the region // create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM); HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd); HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner =
new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM); loadFlushAndCompact(region, TEST_FAM);
compactionCleaner.chore();
// create the another table that we don't archive // create the another table that we don't archive
hcd = new HColumnDescriptor(TEST_FAM); hcd = new HColumnDescriptor(TEST_FAM);
Region otherRegion = UTIL.createTestRegion(otherTable, hcd); HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
regions = new ArrayList<Region>();
regions.add(otherRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop,
rss, false);
loadFlushAndCompact(otherRegion, TEST_FAM); loadFlushAndCompact(otherRegion, TEST_FAM);
compactionCleaner1.chore();
// get the current hfiles in the archive directory // get the current hfiles in the archive directory
// Should be archived
List<Path> files = getAllFiles(fs, archiveDir); List<Path> files = getAllFiles(fs, archiveDir);
if (files == null) { if (files == null) {
FSUtils.logFileSystemState(fs, archiveDir, LOG); FSUtils.logFileSystemState(fs, archiveDir, LOG);

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -94,6 +96,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule; import org.junit.rules.TestRule;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists;
/** /**
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}. * Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output. * Sets up and runs a mapreduce job that writes hfile output.
@ -996,6 +1000,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
}
}
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
}, 5000); }, 5000);
@ -1009,6 +1019,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable<Boolean>() { quickPoll(new Callable<Boolean>() {
@Override @Override
public Boolean call() throws Exception { public Boolean call() throws Exception {
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
for (HRegion region : regions) {
for (Store store : region.getStores()) {
store.closeAndArchiveCompactedFiles();
}
}
return fs.listStatus(storePath).length == 1; return fs.listStatus(storePath).length == 1;
} }
}, 5000); }, 5000);

View File

@ -460,6 +460,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null; return null;
} }
@Override
public List<Region> getOnlineRegions() {
return null;
}
@Override @Override
public OpenRegionResponse openRegion(RpcController controller, public OpenRegionResponse openRegion(RpcController controller,
OpenRegionRequest request) throws ServiceException { OpenRegionRequest request) throws ServiceException {

View File

@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -120,6 +123,7 @@ public class TestSnapshotFromMaster {
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod); conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName()); ConstantSizeRegionSplitPolicy.class.getName());
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
} }
@ -320,6 +324,17 @@ public class TestSnapshotFromMaster {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it. region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 2 so will compact and archive region.compactStores(); // min is 2 so will compact and archive
} }
List<RegionServerThread> regionServerThreads = UTIL.getMiniHBaseCluster()
.getRegionServerThreads();
HRegionServer hrs = null;
for (RegionServerThread rs : regionServerThreads) {
if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) {
hrs = rs.getRegionServer();
break;
}
}
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
cleaner.chore();
LOG.info("After compaction File-System state"); LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG); FSUtils.logFileSystemState(fs, rootDir, LOG);

View File

@ -78,6 +78,11 @@ public class MockStoreFile extends StoreFile {
return false; return false;
} }
@Override
public boolean isCompactedAway() {
return false;
}
@Override @Override
public byte[] getMetadataValue(byte[] key) { public byte[] getMetadataValue(byte[] key) {
return this.metadata.get(key); return this.metadata.get(key);

View File

@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.security.Key; import java.security.Key;
import java.security.SecureRandom; import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.SecretKeySpec;
@ -122,13 +124,14 @@ public class TestEncryptionKeyRotation {
// And major compact // And major compact
TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName()); TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName());
TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() { TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
// When compaction has finished, all of the original files will be // When compaction has finished, all of the original files will be
// gone // gone
boolean found = false; boolean found = false;
for (Path path: initialPaths) { for (Path path: updatePaths) {
found = TEST_UTIL.getTestFileSystem().exists(path); found = TEST_UTIL.getTestFileSystem().exists(path);
if (found) { if (found) {
LOG.info("Found " + path); LOG.info("Found " + path);
@ -140,14 +143,20 @@ public class TestEncryptionKeyRotation {
}); });
// Verify we have store file(s) with only the new key // Verify we have store file(s) with only the new key
Thread.sleep(1000);
waitForCompaction(htd.getTableName());
List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName()); List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
assertTrue(pathsAfterCompaction.size() > 0); assertTrue(pathsAfterCompaction.size() > 0);
for (Path path: pathsAfterCompaction) { for (Path path: pathsAfterCompaction) {
assertFalse("Store file " + path + " retains initial key",
Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
assertTrue("Store file " + path + " has incorrect key", assertTrue("Store file " + path + " has incorrect key",
Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path))); Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
} }
List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName());
assertTrue(compactedPaths.size() > 0);
for (Path path: compactedPaths) {
assertTrue("Store file " + path + " retains initial key",
Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
}
} }
@Test @Test
@ -193,6 +202,33 @@ public class TestEncryptionKeyRotation {
} }
} }
private static void waitForCompaction(TableName tableName)
throws IOException, InterruptedException {
boolean compacted = false;
for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
.getOnlineRegions(tableName)) {
for (Store store : region.getStores()) {
compacted = false;
while (!compacted) {
if (store.getStorefiles() != null) {
while (store.getStorefilesCount() != 1) {
Thread.sleep(100);
}
for (StoreFile storefile : store.getStorefiles()) {
if (!storefile.isCompactedAway()) {
compacted = true;
break;
}
Thread.sleep(100);
}
} else {
break;
}
}
}
}
}
private static List<Path> findStorefilePaths(TableName tableName) throws Exception { private static List<Path> findStorefilePaths(TableName tableName) throws Exception {
List<Path> paths = new ArrayList<Path>(); List<Path> paths = new ArrayList<Path>();
for (Region region: for (Region region:
@ -206,6 +242,23 @@ public class TestEncryptionKeyRotation {
return paths; return paths;
} }
private static List<Path> findCompactedStorefilePaths(TableName tableName) throws Exception {
List<Path> paths = new ArrayList<Path>();
for (Region region:
TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
for (Store store : region.getStores()) {
Collection<StoreFile> compactedfiles =
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
if (compactedfiles != null) {
for (StoreFile storefile : compactedfiles) {
paths.add(storefile.getPath());
}
}
}
}
return paths;
}
private void createTableAndFlush(HTableDescriptor htd) throws Exception { private void createTableAndFlush(HTableDescriptor htd) throws Exception {
HColumnDescriptor hcd = htd.getFamilies().iterator().next(); HColumnDescriptor hcd = htd.getFamilies().iterator().next();
// Create the test table // Create the test table

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -166,9 +167,17 @@ public class TestHRegionReplayEvents {
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
when(rss.getConfiguration()).thenReturn(CONF); when(rss.getConfiguration()).thenReturn(CONF);
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
.toString();
ExecutorService es = new ExecutorService(string);
es.startExecutorService(
string+"-"+string, 1);
when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close(); primaryRegion.close();
List<Region> regions = new ArrayList<Region>();
regions.add(primaryRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null); secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@ -1369,6 +1378,11 @@ public class TestHRegionReplayEvents {
// Test case 3: compact primary files // Test case 3: compact primary files
primaryRegion.compactStores(); primaryRegion.compactStores();
List<Region> regions = new ArrayList<Region>();
regions.add(primaryRegion);
when(rss.getOnlineRegions()).thenReturn(regions);
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
cleaner.chore();
secondaryRegion.refreshStoreFiles(); secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families), assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families)); secondaryRegion.getStoreFileList(families));

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -180,7 +182,7 @@ public class TestRegionMergeTransactionOnCluster {
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
.getTableRegionsAndLocations(master.getZooKeeper(), master.getConnection(), tableName); .getTableRegionsAndLocations(master.getZooKeeper(), master.getConnection(), tableName);
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst(); HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
HTableDescriptor tableDescritor = master.getTableDescriptors().get( HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
tableName); tableName);
Result mergedRegionResult = MetaTableAccessor.getRegionResult( Result mergedRegionResult = MetaTableAccessor.getRegionResult(
master.getConnection(), mergedRegionInfo.getRegionName()); master.getConnection(), mergedRegionInfo.getRegionName());
@ -205,19 +207,46 @@ public class TestRegionMergeTransactionOnCluster {
assertTrue(fs.exists(regionAdir)); assertTrue(fs.exists(regionAdir));
assertTrue(fs.exists(regionBdir)); assertTrue(fs.exists(regionBdir));
admin.compactRegion(mergedRegionInfo.getRegionName()); HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
// wait until merged region doesn't have reference file
long timeout = System.currentTimeMillis() + waitTime;
HRegionFileSystem hrfs = new HRegionFileSystem( HRegionFileSystem hrfs = new HRegionFileSystem(
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo); TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
int count = 0;
for(HColumnDescriptor colFamily : columnFamilies) {
count += hrfs.getStoreFiles(colFamily.getName()).size();
}
admin.compactRegion(mergedRegionInfo.getRegionName());
// clean up the merged region store files
// wait until merged region have reference file
long timeout = System.currentTimeMillis() + waitTime;
int newcount = 0;
while (System.currentTimeMillis() < timeout) { while (System.currentTimeMillis() < timeout) {
if (!hrfs.hasReferences(tableDescritor)) { for(HColumnDescriptor colFamily : columnFamilies) {
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount > count) {
break;
}
Thread.sleep(50);
}
assertTrue(newcount > count);
List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
.getRegionServerThreads();
for (RegionServerThread rs : regionServerThreads) {
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
rs.getRegionServer(), false);
cleaner.chore();
Thread.sleep(1000);
}
int newcount1 = 0;
while (System.currentTimeMillis() < timeout) {
for(HColumnDescriptor colFamily : columnFamilies) {
newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
}
if(newcount1 <= 1) {
break; break;
} }
Thread.sleep(50); Thread.sleep(50);
} }
assertFalse(hrfs.hasReferences(tableDescritor));
// run CatalogJanitor to clean merge references in hbase:meta and archive the // run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions // files of merging regions
int cleaned = admin.runCatalogScan(); int cleaned = admin.runCatalogScan();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*; import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -254,6 +256,7 @@ public class TestRegionReplicas {
LOG.info("Flushing primary region"); LOG.info("Flushing primary region");
Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName()); Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
region.flush(true); region.flush(true);
HRegion primaryRegion = (HRegion) region;
// ensure that chore is run // ensure that chore is run
LOG.info("Sleeping for " + (4 * refreshPeriod)); LOG.info("Sleeping for " + (4 * refreshPeriod));
@ -283,7 +286,7 @@ public class TestRegionReplicas {
assertGetRpc(hriSecondary, 1042, true); assertGetRpc(hriSecondary, 1042, true);
assertGetRpc(hriSecondary, 2042, true); assertGetRpc(hriSecondary, 2042, true);
// ensure that we are see the 3 store files // ensure that we see the 3 store files
Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount()); Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
// force compaction // force compaction
@ -298,7 +301,8 @@ public class TestRegionReplicas {
} }
// ensure that we see the compacted file only // ensure that we see the compacted file only
Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount()); // This will be 4 until the cleaner chore runs
Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
} finally { } finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000); HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
@ -457,7 +461,19 @@ public class TestRegionReplicas {
LOG.info("Force Major compaction on primary region " + hriPrimary); LOG.info("Force Major compaction on primary region " + hriPrimary);
primaryRegion.compact(true); primaryRegion.compact(true);
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount()); Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
List<RegionServerThread> regionServerThreads = HTU.getMiniHBaseCluster()
.getRegionServerThreads();
HRegionServer hrs = null;
for (RegionServerThread rs : regionServerThreads) {
if (rs.getRegionServer()
.getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
hrs = rs.getRegionServer();
break;
}
}
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(100, null, hrs, false);
cleaner.chore();
// scan all the hfiles on the secondary. // scan all the hfiles on the secondary.
// since there are no read on the secondary when we ask locations to // since there are no read on the secondary when we ask locations to
// the NN a FileNotFound exception will be returned and the FileLink // the NN a FileNotFound exception will be returned and the FileLink

View File

@ -1027,6 +1027,18 @@ public class TestStore {
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
} }
private void closeCompactedFile(int index) throws IOException {
Collection<StoreFile> files =
this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
StoreFile sf = null;
Iterator<StoreFile> it = files.iterator();
for (int i = 0; i <= index; i++) {
sf = it.next();
}
sf.closeReader(true);
store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
}
@Test @Test
public void testRefreshStoreFiles() throws Exception { public void testRefreshStoreFiles() throws Exception {
init(name.getMethodName()); init(name.getMethodName());
@ -1054,6 +1066,7 @@ public class TestStore {
store.refreshStoreFiles(); store.refreshStoreFiles();
assertEquals(5, this.store.getStorefilesCount()); assertEquals(5, this.store.getStorefilesCount());
closeCompactedFile(0);
archiveStoreFile(0); archiveStoreFile(0);
assertEquals(5, this.store.getStorefilesCount()); assertEquals(5, this.store.getStorefilesCount());

View File

@ -452,9 +452,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these // normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed. // two calls we are good and the bug was quashed.
scan.updateReaders(); scan.updateReaders(new ArrayList<StoreFile>());
scan.updateReaders(); scan.updateReaders(new ArrayList<StoreFile>());
scan.peek(); scan.peek();
} }

View File

@ -137,6 +137,7 @@ public class TestStripeStoreFileManager {
MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B), MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
stripe1 = createFile(KEY_B, OPEN_KEY); stripe1 = createFile(KEY_B, OPEN_KEY);
manager.addCompactionResults(al(l0File), al(stripe0a, stripe1)); manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
manager.removeCompactedFiles(al(l0File));
// If we want a key <= KEY_A, we should get everything except stripe1. // If we want a key <= KEY_A, we should get everything except stripe1.
ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A)); ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
assertEquals(2, sfsDump.size()); assertEquals(2, sfsDump.size());
@ -162,6 +163,7 @@ public class TestStripeStoreFileManager {
// a candidate from the first file, the old one should not be removed. // a candidate from the first file, the old one should not be removed.
StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B); StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
manager.addCompactionResults(al(l0File2), al(stripe0b)); manager.addCompactionResults(al(l0File2), al(stripe0b));
manager.removeCompactedFiles(al(l0File2));
sfs = manager.getCandidateFilesForRowKeyBefore(KV_A); sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
assertEquals(stripe0b, sfs.next()); assertEquals(stripe0b, sfs.next());
sfs.remove(); sfs.remove();
@ -350,10 +352,12 @@ public class TestStripeStoreFileManager {
// Here, [B, C] is logically [B, inf), so we should be able to compact it to that only. // Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY))); manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
manager.removeCompactedFiles(al(sf));
// Do the same for other variants. // Do the same for other variants.
manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY))); manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C))); manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
manager.removeCompactedFiles(al(sf));
manager = createManager(al(sf)); manager = createManager(al(sf));
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C))); verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY))); manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
@ -379,6 +383,7 @@ public class TestStripeStoreFileManager {
StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C); StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY); StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0)); manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
manager.removeCompactedFiles(al(sf_L0_0a));
verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0)); verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
// Add another l0 file, "compact" both L0 into two stripes // Add another l0 file, "compact" both L0 into two stripes
@ -387,51 +392,61 @@ public class TestStripeStoreFileManager {
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C); StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
manager.insertNewFiles(al(sf_L0_1)); manager.insertNewFiles(al(sf_L0_1));
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1)); manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1));
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1)); verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
// Try compacting with invalid file (no metadata) - should add files to L0. // Try compacting with invalid file (no metadata) - should add files to L0.
StoreFile sf_L0_2 = createFile(null, null); StoreFile sf_L0_2 = createFile(null, null);
manager.addCompactionResults(al(), al(sf_L0_2)); manager.addCompactionResults(al(), al(sf_L0_2));
manager.removeCompactedFiles(al());
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2)); verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
// Remove it... // Remove it...
manager.addCompactionResults(al(sf_L0_2), al()); manager.addCompactionResults(al(sf_L0_2), al());
manager.removeCompactedFiles(al(sf_L0_2));
// Do regular compaction in the first stripe. // Do regular compaction in the first stripe.
StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B); StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3)); manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1));
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3)); verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
// Rebalance two stripes. // Rebalance two stripes.
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D); StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY); StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4)); manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1));
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4)); verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
// Split the first stripe. // Split the first stripe.
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A); StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B); StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5)); manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
manager.removeCompactedFiles(al(sf_i2B_3));
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5)); verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
// Split the middle stripe. // Split the middle stripe.
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C); StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D); StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6)); manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
manager.removeCompactedFiles(al(sf_B2D_4));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
// Merge two different middle stripes. // Merge two different middle stripes.
StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C); StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7)); manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
// Merge lower half. // Merge lower half.
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C); StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8)); manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7));
verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8)); verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
// Merge all. // Merge all.
StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY); StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9)); manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
verifyAllFiles(manager, al(sf_i2i_9)); verifyAllFiles(manager, al(sf_i2i_9));
} }
@ -451,12 +466,14 @@ public class TestStripeStoreFileManager {
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i); verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
// Remove these files. // Remove these files.
sfm.addCompactionResults(al(sf_i2d, sf_d2i), al()); sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
sfm.removeCompactedFiles(al(sf_i2d, sf_d2i));
assertEquals(0, sfm.getLevel0Files().size()); assertEquals(0, sfm.getLevel0Files().size());
// Add another file to stripe; then "rebalance" stripes w/o it - the file, which was // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
// presumably flushed during compaction, should go to L0. // presumably flushed during compaction, should go to L0.
StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C); StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
sfm.insertNewFiles(al(sf_i2c_2)); sfm.insertNewFiles(al(sf_i2c_2));
sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i)); sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
sfm.removeCompactedFiles(al(sf_i2c, sf_c2i));
assertEquals(1, sfm.getLevel0Files().size()); assertEquals(1, sfm.getLevel0Files().size());
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2); verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
} }
@ -472,9 +489,11 @@ public class TestStripeStoreFileManager {
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B), ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY)); createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0a), compacted); manager.addCompactionResults(al(sf0a), compacted);
manager.removeCompactedFiles(al(sf0a));
// Next L0 compaction only produces file for the first and last stripe. // Next L0 compaction only produces file for the first and last stripe.
ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY)); ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0b), compacted2); manager.addCompactionResults(al(sf0b), compacted2);
manager.removeCompactedFiles(al(sf0b));
compacted.addAll(compacted2); compacted.addAll(compacted2);
verifyAllFiles(manager, compacted); verifyAllFiles(manager, compacted);
} }

View File

@ -125,7 +125,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator(); ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) { while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next(); StoreScanner ss = (StoreScanner)scanners.next();
ss.updateReaders(); ss.updateReaders(new ArrayList<StoreFile>());
} }
} while (more); } while (more);

View File

@ -0,0 +1,398 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MediumTests.class, RegionServerTests.class })
public class TestCompactedHFilesDischarger {
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private Region region;
private final static byte[] fam = Bytes.toBytes("cf_1");
private final static byte[] qual1 = Bytes.toBytes("qf_1");
private final static byte[] val = Bytes.toBytes("val");
private static CountDownLatch latch = new CountDownLatch(3);
private static AtomicInteger counter = new AtomicInteger(0);
private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
private RegionServerServices rss;
@Before
public void setUp() throws Exception {
TableName tableName = TableName.valueOf(getClass().getSimpleName());
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(fam));
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
rss = mock(RegionServerServices.class);
List<Region> regions = new ArrayList<Region>();
regions.add(region);
when(rss.getOnlineRegions()).thenReturn(regions);
}
@After
public void tearDown() throws IOException {
counter.set(0);
scanCompletedCounter.set(0);
latch = new CountDownLatch(3);
HBaseTestingUtility.closeRegionAndWAL(region);
testUtil.cleanupTestDir();
}
@Test
public void testCompactedHFilesCleaner() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 11; i < 20; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 21; i < 30; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
Store store = region.getStore(fam);
assertEquals(3, store.getStorefilesCount());
Collection<StoreFile> storefiles = store.getStorefiles();
Collection<StoreFile> compactedfiles =
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
// None of the files should be in compacted state.
for (StoreFile file : storefiles) {
assertFalse(file.isCompactedAway());
}
// Try to run the cleaner without compaction. there should not be any change
cleaner.chore();
storefiles = store.getStorefiles();
// None of the files should be in compacted state.
for (StoreFile file : storefiles) {
assertFalse(file.isCompactedAway());
}
// now do some compaction
region.compact(true);
// Still the flushed files should be present until the cleaner runs. But the state of it should
// be in COMPACTED state
assertEquals(1, store.getStorefilesCount());
assertEquals(3,
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
// Run the cleaner
cleaner.chore();
assertEquals(1, store.getStorefilesCount());
storefiles = store.getStorefiles();
for (StoreFile file : storefiles) {
// Should not be in compacted state
assertFalse(file.isCompactedAway());
}
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
assertTrue(compactedfiles.size() == 0);
}
@Test
public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 11; i < 20; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 21; i < 30; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
Store store = region.getStore(fam);
assertEquals(3, store.getStorefilesCount());
Collection<StoreFile> storefiles = store.getStorefiles();
Collection<StoreFile> compactedfiles =
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
// None of the files should be in compacted state.
for (StoreFile file : storefiles) {
assertFalse(file.isCompactedAway());
}
// Do compaction
region.compact(true);
startScannerThreads();
storefiles = store.getStorefiles();
int usedReaderCount = 0;
int unusedReaderCount = 0;
for (StoreFile file : storefiles) {
if (file.getRefCount() == 3) {
usedReaderCount++;
}
}
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
for(StoreFile file : compactedfiles) {
assertEquals("Refcount should be 3", 0, file.getRefCount());
unusedReaderCount++;
}
// Though there are files we are not using them for reads
assertEquals("unused reader count should be 3", 3, unusedReaderCount);
assertEquals("used reader count should be 1", 1, usedReaderCount);
// now run the cleaner
cleaner.chore();
countDown();
assertEquals(1, store.getStorefilesCount());
storefiles = store.getStorefiles();
for (StoreFile file : storefiles) {
// Should not be in compacted state
assertFalse(file.isCompactedAway());
}
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
assertTrue(compactedfiles.size() == 0);
}
@Test
public void testCleanerWithParallelScanners() throws Exception {
// Create the cleaner object
CompactedHFilesDischarger cleaner =
new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
// Add some data to the region and do some flushes
for (int i = 1; i < 10; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 11; i < 20; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
for (int i = 21; i < 30; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(fam, qual1, val);
region.put(p);
}
// flush them
region.flush(true);
Store store = region.getStore(fam);
assertEquals(3, store.getStorefilesCount());
Collection<StoreFile> storefiles = store.getStorefiles();
Collection<StoreFile> compactedfiles =
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
// None of the files should be in compacted state.
for (StoreFile file : storefiles) {
assertFalse(file.isCompactedAway());
}
startScannerThreads();
// Do compaction
region.compact(true);
storefiles = store.getStorefiles();
int usedReaderCount = 0;
int unusedReaderCount = 0;
for (StoreFile file : storefiles) {
if (file.getRefCount() == 0) {
unusedReaderCount++;
}
}
compactedfiles =
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
for(StoreFile file : compactedfiles) {
assertEquals("Refcount should be 3", 3, file.getRefCount());
usedReaderCount++;
}
// The newly compacted file will not be used by any scanner
assertEquals("unused reader count should be 1", 1, unusedReaderCount);
assertEquals("used reader count should be 3", 3, usedReaderCount);
// now run the cleaner
cleaner.chore();
countDown();
// No change in the number of store files as none of the compacted files could be cleaned up
assertEquals(1, store.getStorefilesCount());
assertEquals(3,
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
while (scanCompletedCounter.get() != 3) {
Thread.sleep(100);
}
// reset
latch = new CountDownLatch(3);
scanCompletedCounter.set(0);
counter.set(0);
// Try creating a new scanner and it should use only the new file created after compaction
startScannerThreads();
storefiles = store.getStorefiles();
usedReaderCount = 0;
unusedReaderCount = 0;
for (StoreFile file : storefiles) {
if (file.getRefCount() == 3) {
usedReaderCount++;
}
}
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
for(StoreFile file : compactedfiles) {
assertEquals("Refcount should be 0", 0, file.getRefCount());
unusedReaderCount++;
}
// Though there are files we are not using them for reads
assertEquals("unused reader count should be 3", 3, unusedReaderCount);
assertEquals("used reader count should be 1", 1, usedReaderCount);
countDown();
while (scanCompletedCounter.get() != 3) {
Thread.sleep(100);
}
// Run the cleaner again
cleaner.chore();
// Now the cleaner should be able to clear it up because there are no active readers
assertEquals(1, store.getStorefilesCount());
storefiles = store.getStorefiles();
for (StoreFile file : storefiles) {
// Should not be in compacted state
assertFalse(file.isCompactedAway());
}
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
assertTrue(compactedfiles.size() == 0);
}
protected void countDown() {
// count down 3 times
latch.countDown();
latch.countDown();
latch.countDown();
}
protected void startScannerThreads() throws InterruptedException {
// Start parallel scan threads
ScanThread[] scanThreads = new ScanThread[3];
for (int i = 0; i < 3; i++) {
scanThreads[i] = new ScanThread((HRegion) region);
}
for (ScanThread thread : scanThreads) {
thread.start();
}
while (counter.get() != 3) {
Thread.sleep(100);
}
}
private static class ScanThread extends Thread {
private final HRegion region;
public ScanThread(HRegion region) {
this.region = region;
}
@Override
public void run() {
try {
initiateScan(region);
} catch (IOException e) {
// do nothing
}
}
private void initiateScan(HRegion region) throws IOException {
Scan scan = new Scan();
scan.setCaching(1);
RegionScanner resScanner = null;
try {
resScanner = region.getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean next = resScanner.next(results);
try {
counter.incrementAndGet();
latch.await();
} catch (InterruptedException e) {
}
while (!next) {
resScanner.next(results);
}
} finally {
scanCompletedCounter.incrementAndGet();
resScanner.close();
}
}
}
}