diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index ac76edbdeae..a7759c53c78 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -265,7 +265,14 @@ public enum EventType {
*
* RS_REGION_REPLICA_FLUSH
*/
- RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
+ RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
+
+ /**
+ * RS compacted files discharger
+ *
+ * RS_COMPACTED_FILES_DISCHARGER
+ */
+ RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
private final int code;
private final ExecutorType executor;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index d0f6beedbb6..5a161497b91 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -46,7 +46,8 @@ public enum ExecutorType {
RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27),
- RS_REGION_REPLICA_FLUSH_OPS (28);
+ RS_REGION_REPLICA_FLUSH_OPS (28),
+ RS_COMPACTED_FILES_DISCHARGER (29);
ExecutorType(int value) {}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 42cca2bcaeb..018e173f2d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
import org.apache.hadoop.hbase.monitoring.ThreadMonitoring;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -86,7 +87,8 @@ public class ExecutorService {
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
*/
- void startExecutorService(String name, int maxThreads) {
+ @VisibleForTesting
+ public void startExecutorService(String name, int maxThreads) {
if (this.executorMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name +
" is already running!");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 36b75597064..0bc75e7d078 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -33,5 +34,5 @@ public interface ChangedReadersObserver {
* Notify observers.
* @throws IOException e
*/
- void updateReaders() throws IOException;
+ void updateReaders(List sfs) throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
new file mode 100644
index 00000000000..02160d8fa6c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischargeHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+
+/**
+ * Event handler that handles the removal and archival of the compacted hfiles
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischargeHandler extends EventHandler {
+
+ private HStore store;
+
+ public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store) {
+ super(server, eventType);
+ this.store = store;
+ }
+
+ @Override
+ public void process() throws IOException {
+ this.store.closeAndArchiveCompactedFiles();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
new file mode 100644
index 00000000000..9f6c65c175e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactedHFilesDischarger.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.executor.EventType;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A chore service that periodically cleans up the compacted files when there are no active readers
+ * using those compacted files and also helps in clearing the block cache with these compacted
+ * file entries
+ */
+@InterfaceAudience.Private
+public class CompactedHFilesDischarger extends ScheduledChore {
+ private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
+ private RegionServerServices regionServerServices;
+ // Default is to use executor
+ @VisibleForTesting
+ private boolean useExecutor = true;
+
+ /**
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param regionServerServices the region server that starts this chore
+ */
+ public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+ final RegionServerServices regionServerServices) {
+ // Need to add the config classes
+ super("CompactedHFilesCleaner", stopper, period);
+ this.regionServerServices = regionServerServices;
+ }
+
+ /**
+ * @param period the period of time to sleep between each run
+ * @param stopper the stopper
+ * @param regionServerServices the region server that starts this chore
+ * @param useExecutor true if to use the region server's executor service, false otherwise
+ */
+ @VisibleForTesting
+ public CompactedHFilesDischarger(final int period, final Stoppable stopper,
+ final RegionServerServices regionServerServices, boolean useExecutor) {
+ // Need to add the config classes
+ this(period, stopper, regionServerServices);
+ this.useExecutor = useExecutor;
+ }
+
+ @Override
+ public void chore() {
+ if (regionServerServices == null) return;
+ List onlineRegions = regionServerServices.getOnlineRegions();
+ if (onlineRegions != null) {
+ for (Region region : onlineRegions) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Started the compacted hfiles cleaner for the region " + region.getRegionInfo());
+ }
+ for (Store store : region.getStores()) {
+ try {
+ if (useExecutor && regionServerServices != null) {
+ CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler(
+ (Server) regionServerServices, EventType.RS_COMPACTED_FILES_DISCHARGER,
+ (HStore) store);
+ regionServerServices.getExecutorService().submit(handler);
+ } else {
+ // call synchronously if the RegionServerServices are not
+ // available
+ store.closeAndArchiveCompactedFiles();
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed archiving the compacted files for the region "
+ + region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while trying to close and archive the comapcted store "
+ + "files of the store " + store.getColumnFamilyName() + " in the" + " region "
+ + region.getRegionInfo(), e);
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Completed the compacted hfiles cleaner for the region " + region.getRegionInfo());
+ }
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index 6000f01acc7..66112029095 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager {
* is atomically replaced when its contents change.
*/
private volatile ImmutableList storefiles = null;
+ /**
+ * List of compacted files inside this store that needs to be excluded in reads
+ * because further new reads will be using only the newly created files out of compaction.
+ * These compacted files will be deleted/cleared once all the existing readers on these
+ * compacted files are done.
+ */
+ private volatile List compactedfiles = null;
public DefaultStoreFileManager(KVComparator kvComparator, Configuration conf,
CompactionConfiguration comConf) {
@@ -74,6 +81,11 @@ class DefaultStoreFileManager implements StoreFileManager {
return storefiles;
}
+ @Override
+ public Collection getCompactedfiles() {
+ return compactedfiles;
+ }
+
@Override
public void insertNewFiles(Collection sfs) throws IOException {
ArrayList newFiles = new ArrayList(storefiles);
@@ -88,6 +100,13 @@ class DefaultStoreFileManager implements StoreFileManager {
return result;
}
+ @Override
+ public Collection clearCompactedFiles() {
+ List result = compactedfiles;
+ compactedfiles = new ArrayList();
+ return result;
+ }
+
@Override
public final int getStorefileCount() {
return storefiles.size();
@@ -95,13 +114,42 @@ class DefaultStoreFileManager implements StoreFileManager {
@Override
public void addCompactionResults(
- Collection compactedFiles, Collection results) {
+ Collection newCompactedfiles, Collection results) {
ArrayList newStoreFiles = Lists.newArrayList(storefiles);
- newStoreFiles.removeAll(compactedFiles);
+ newStoreFiles.removeAll(newCompactedfiles);
if (!results.isEmpty()) {
newStoreFiles.addAll(results);
}
sortAndSetStoreFiles(newStoreFiles);
+ ArrayList updatedCompactedfiles = null;
+ if (this.compactedfiles != null) {
+ updatedCompactedfiles = new ArrayList(this.compactedfiles);
+ updatedCompactedfiles.addAll(newCompactedfiles);
+ } else {
+ updatedCompactedfiles = new ArrayList(newCompactedfiles);
+ }
+ markCompactedAway(newCompactedfiles);
+ this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+ }
+
+ // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+ // Let a background thread close the actual reader on these compacted files and also
+ // ensure to evict the blocks from block cache so that they are no longer in
+ // cache
+ private void markCompactedAway(Collection compactedFiles) {
+ for (StoreFile file : compactedFiles) {
+ file.markCompactedAway();
+ }
+ }
+
+ @Override
+ public void removeCompactedFiles(Collection removedCompactedfiles) throws IOException {
+ ArrayList updatedCompactedfiles = null;
+ if (this.compactedfiles != null) {
+ updatedCompactedfiles = new ArrayList(this.compactedfiles);
+ updatedCompactedfiles.removeAll(removedCompactedfiles);
+ this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
+ }
}
@Override
@@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager {
storefiles = ImmutableList.copyOf(storeFiles);
}
+ private List sortCompactedfiles(List storefiles) {
+ // Sorting may not be really needed here for the compacted files?
+ Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
+ return new ArrayList(storefiles);
+ }
+
@Override
public double getCompactionPressure() {
int storefileCount = getStorefileCount();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 899f2fcf2dd..1c3583298ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1533,6 +1533,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.metricsRegionWrapper != null) {
Closeables.closeQuietly(this.metricsRegionWrapper);
}
+
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
@@ -6786,6 +6787,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
}
+ // clear the compacted files if any
+ for (Store s : dstRegion.getStores()) {
+ s.closeAndArchiveCompactedFiles();
+ }
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7351fa8996e..7287f7852ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -479,6 +480,8 @@ public class HRegionServer extends HasThread implements
*/
protected final ConfigurationManager configurationManager;
+ private CompactedHFilesDischarger compactedFileDischarger;
+
/**
* Starts a HRegionServer at the default location.
* @param conf
@@ -611,6 +614,16 @@ public class HRegionServer extends HasThread implements
}
});
}
+ // Create the CompactedFileDischarger chore service. This chore helps to
+ // remove the compacted files
+ // that will no longer be used in reads.
+ // Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
+ // 2 mins so that compacted files can be archived before the TTLCleaner runs
+ int cleanerInterval = conf
+ .getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL, 2 * 60 * 1000);
+ this.compactedFileDischarger =
+ new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this);
+ choreService.scheduleChore(compactedFileDischarger);
}
/*
@@ -1708,7 +1721,9 @@ public class HRegionServer extends HasThread implements
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
-
+ // Start the threads for compacted files discharger
+ this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
+ conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
@@ -2732,6 +2747,15 @@ public class HRegionServer extends HasThread implements
return coprocessors.toArray(new String[coprocessors.size()]);
}
+ @Override
+ public List getOnlineRegions() {
+ List allRegions = new ArrayList();
+ synchronized (this.onlineRegions) {
+ // Return a clone copy of the onlineRegions
+ allRegions.addAll(onlineRegions.values());
+ }
+ return allRegions;
+ }
/**
* Try to close the region, logs a warning on failure but continues.
* @param region Region to close
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index a19407a8666..cb62c95ce32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
@@ -273,7 +272,6 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber);
}
-
// Crypto context for new store files
String cipherName = family.getEncryptionType();
if (cipherName != null) {
@@ -531,14 +529,15 @@ public class HStore implements Store {
try {
Future future = completionService.take();
StoreFile storeFile = future.get();
- long length = storeFile.getReader().length();
- this.storeSize += length;
- this.totalUncompressedBytes +=
- storeFile.getReader().getTotalUncompressedBytes();
- if (LOG.isDebugEnabled()) {
- LOG.debug("loaded " + storeFile.toStringDetailed());
+ if (storeFile != null) {
+ long length = storeFile.getReader().length();
+ this.storeSize += length;
+ this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loaded " + storeFile.toStringDetailed());
+ }
+ results.add(storeFile);
}
- results.add(storeFile);
} catch (InterruptedException e) {
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
@@ -636,8 +635,7 @@ public class HStore implements Store {
region.getMVCC().advanceTo(this.getMaxSequenceId());
}
- // notify scanners, close file readers, and recompute store size
- completeCompaction(toBeRemovedStoreFiles, false);
+ completeCompaction(toBeRemovedStoreFiles);
}
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
@@ -814,7 +812,6 @@ public class HStore implements Store {
// the lock.
this.lock.writeLock().unlock();
}
- notifyChangedReadersObservers();
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
if (LOG.isTraceEnabled()) {
String traceMessage = "BULK LOAD time,size,store size,store files ["
@@ -830,7 +827,12 @@ public class HStore implements Store {
try {
// Clear so metrics doesn't find them.
ImmutableCollection result = storeEngine.getStoreFileManager().clearFiles();
-
+ Collection compactedfiles =
+ storeEngine.getStoreFileManager().clearCompactedFiles();
+ // clear the compacted files
+ if (compactedfiles != null && !compactedfiles.isEmpty()) {
+ removeCompactedfiles(compactedfiles);
+ }
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -845,7 +847,7 @@ public class HStore implements Store {
@Override
public Void call() throws IOException {
boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
+ cacheConf != null? cacheConf.shouldEvictOnClose(): true;
f.closeReader(evictOnClose);
return null;
}
@@ -1067,10 +1069,8 @@ public class HStore implements Store {
// the lock.
this.lock.writeLock().unlock();
}
-
- // Tell listeners of the change in readers.
- notifyChangedReadersObservers();
-
+ // notify to be called here - only in case of flushes
+ notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
long totalSize = 0;
for (StoreFile sf : sfs) {
@@ -1088,9 +1088,9 @@ public class HStore implements Store {
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
- private void notifyChangedReadersObservers() throws IOException {
- for (ChangedReadersObserver o: this.changedReaderObservers) {
- o.updateReaders();
+ private void notifyChangedReadersObservers(List sfs) throws IOException {
+ for (ChangedReadersObserver o : this.changedReaderObservers) {
+ o.updateReaders(sfs);
}
}
@@ -1129,6 +1129,30 @@ public class HStore implements Store {
return scanners;
}
+ @Override
+ public List getScanners(List files, boolean cacheBlocks,
+ boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+ byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException {
+ List memStoreScanners = null;
+ if (includeMemstoreScanner) {
+ this.lock.readLock().lock();
+ try {
+ memStoreScanners = this.memstore.getScanners(readPt);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+ List sfScanners = StoreFileScanner.getScannersForStoreFiles(files,
+ cacheBlocks, usePread, isCompaction, false, matcher, readPt, isPrimaryReplicaStore());
+ List scanners = new ArrayList(sfScanners.size() + 1);
+ scanners.addAll(sfScanners);
+ // Then the memstore scanners
+ if (memStoreScanners != null) {
+ scanners.addAll(memStoreScanners);
+ }
+ return scanners;
+ }
+
@Override
public void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o);
@@ -1248,7 +1272,7 @@ public class HStore implements Store {
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
// At this point the store will use new files for all new scanners.
- completeCompaction(filesToCompact, true); // Archive old files & update store size.
+ completeCompaction(filesToCompact); // update store size.
logCompactionEndMessage(cr, sfs, compactionStartTime);
return sfs;
@@ -1436,7 +1460,7 @@ public class HStore implements Store {
LOG.info("Replaying compaction marker, replacing input files: " +
inputStoreFiles + " with output files : " + outputStoreFiles);
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
- this.completeCompaction(inputStoreFiles, removeFiles);
+ this.completeCompaction(inputStoreFiles);
}
}
@@ -1488,7 +1512,7 @@ public class HStore implements Store {
this.getCoprocessorHost().postCompact(this, sf, null);
}
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
- completeCompaction(filesToCompact, true);
+ completeCompaction(filesToCompact);
}
} finally {
synchronized (filesCompacting) {
@@ -1771,32 +1795,7 @@ public class HStore implements Store {
@VisibleForTesting
protected void completeCompaction(final Collection compactedFiles, boolean removeFiles)
throws IOException {
- try {
- // Do not delete old store files until we have sent out notification of
- // change in case old files are still being accessed by outstanding scanners.
- // Don't do this under writeLock; see HBASE-4485 for a possible deadlock
- // scenario that could have happened if continue to hold the lock.
- notifyChangedReadersObservers();
- // At this point the store will use new files for all scanners.
-
- // let the archive util decide if we should archive or delete the files
- LOG.debug("Removing store files after compaction...");
- boolean evictOnClose =
- cacheConf != null? cacheConf.shouldEvictOnClose(): true;
- for (StoreFile compactedFile : compactedFiles) {
- compactedFile.closeReader(evictOnClose);
- }
- if (removeFiles) {
- this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
- }
- } catch (IOException e) {
- e = RemoteExceptionHandler.checkIOException(e);
- LOG.error("Failed removing compacted files in " + this +
- ". Files we were trying to remove are " + compactedFiles.toString() +
- "; some of them may have been already removed", e);
- }
-
- // 4. Compute new store size
+ LOG.debug("Completing compaction...");
this.storeSize = 0L;
this.totalUncompressedBytes = 0L;
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
@@ -2490,4 +2489,92 @@ public class HStore implements Store {
public boolean isPrimaryReplicaStore() {
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
}
+
+ @Override
+ public void closeAndArchiveCompactedFiles() throws IOException {
+ lock.readLock().lock();
+ Collection copyCompactedfiles = null;
+ try {
+ Collection compactedfiles =
+ this.getStoreEngine().getStoreFileManager().getCompactedfiles();
+ if (compactedfiles != null && compactedfiles.size() != 0) {
+ // Do a copy under read lock
+ copyCompactedfiles = new ArrayList(compactedfiles);
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("No compacted files to archive");
+ return;
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
+ removeCompactedfiles(copyCompactedfiles);
+ }
+ }
+
+ /**
+ * Archives and removes the compacted files
+ * @param compactedfiles The compacted files in this store that are not active in reads
+ * @throws IOException
+ */
+ private void removeCompactedfiles(Collection compactedfiles)
+ throws IOException {
+ final List filesToRemove = new ArrayList(compactedfiles.size());
+ for (final StoreFile file : compactedfiles) {
+ synchronized (file) {
+ try {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The file " + file + " was closed but still not archived.");
+ }
+ filesToRemove.add(file);
+ }
+ if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+ // Even if deleting fails we need not bother as any new scanners won't be
+ // able to use the compacted file as the status is already compactedAway
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing and archiving the file " + file.getPath());
+ }
+ r.close(true);
+ // Just close and return
+ filesToRemove.add(file);
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Exception while trying to close the compacted store file " + file.getPath().getName());
+ }
+ }
+ }
+ if (this.isPrimaryReplicaStore()) {
+ // Only the primary region is allowed to move the file to archive.
+ // The secondary region does not move the files to archive. Any active reads from
+ // the secondary region will still work because the file as such has active readers on it.
+ if (!filesToRemove.isEmpty()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving the files " + filesToRemove + " to archive");
+ }
+ // Only if this is successful it has to be removed
+ this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove);
+ }
+ }
+ if (!filesToRemove.isEmpty()) {
+ // Clear the compactedfiles from the store file manager
+ clearCompactedfiles(filesToRemove);
+ }
+ }
+
+ private void clearCompactedfiles(final List filesToRemove) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
+ }
+ try {
+ lock.writeLock().lock();
+ this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
index 60fc9fb8ade..310108c0dc7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
@@ -67,4 +67,10 @@ public interface OnlineRegions extends Server {
* @throws java.io.IOException
*/
List getOnlineRegions(TableName tableName) throws IOException;
+
+ /**
+ * Get all online regions in this RS.
+ * @return List of online Region
+ */
+ List getOnlineRegions();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index e319f909d6c..0f12b0af899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -124,24 +124,15 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
- lock.lock();
- try {
- checkReseek();
- return this.heap.seekToPreviousRow(key);
- } finally {
- lock.unlock();
- }
-
+ boolean flushed = checkFlushed();
+ checkReseek(flushed);
+ return this.heap.seekToPreviousRow(key);
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
- lock.lock();
- try {
- checkReseek();
- return this.heap.backwardSeek(key);
- } finally {
- lock.unlock();
- }
+ boolean flushed = checkFlushed();
+ checkReseek(flushed);
+ return this.heap.backwardSeek(key);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 9f1752626a8..ddcd4e93da8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -105,6 +105,25 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
byte[] stopRow,
long readPt
) throws IOException;
+
+ /**
+ * Create scanners on the given files and if needed on the memstore with no filtering based on TTL
+ * (that happens further down the line).
+ * @param files the list of files on which the scanners has to be created
+ * @param cacheBlocks cache the blocks or not
+ * @param isGet true if it is get, false if not
+ * @param usePread true to use pread, false if not
+ * @param isCompaction true if the scanner is created for compaction
+ * @param matcher the scan query matcher
+ * @param startRow the start row
+ * @param stopRow the stop row
+ * @param readPt the read point of the current scan
+ * @param includeMemstoreScanner true if memstore has to be included
+ * @return scanners on the given files and on the memstore if specified
+ */
+ List getScanners(List files, boolean cacheBlocks, boolean isGet,
+ boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+ byte[] stopRow, long readPt, boolean includeMemstoreScanner) throws IOException;
ScanInfo getScanInfo();
@@ -480,4 +499,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
boolean isPrimaryReplicaStore();
+
+ /**
+ * Closes and archives the compacted files under this store
+ */
+ void closeAndArchiveCompactedFiles() throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 46a64f26654..57a272e5f3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -368,6 +371,19 @@ public class StoreFile {
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
}
+ @VisibleForTesting
+ public boolean isCompactedAway() {
+ if (this.reader != null) {
+ return this.reader.isCompactedAway();
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ public int getRefCount() {
+ return this.reader.refCount.get();
+ }
+
/**
* Return the timestamp at which this bulk load file was generated.
*/
@@ -536,6 +552,15 @@ public class StoreFile {
}
}
+ /**
+ * Marks the status of the file as compactedAway.
+ */
+ public void markCompactedAway() {
+ if (this.reader != null) {
+ this.reader.markCompactedAway();
+ }
+ }
+
/**
* Delete this file
* @throws IOException
@@ -1072,6 +1097,12 @@ public class StoreFile {
private byte[] lastBloomKey;
private long deleteFamilyCnt = -1;
private boolean bulkLoadResult = false;
+ // Counter that is incremented every time a scanner is created on the
+ // store file. It is decremented when the scan on the store file is
+ // done.
+ private AtomicInteger refCount = new AtomicInteger(0);
+ // Indicates if the file got compacted
+ private volatile boolean compactedAway = false;
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
throws IOException {
@@ -1079,6 +1110,10 @@ public class StoreFile {
bloomFilterType = BloomType.NONE;
}
+ void markCompactedAway() {
+ this.compactedAway = true;
+ }
+
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Configuration conf) throws IOException {
reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
@@ -1130,11 +1165,35 @@ public class StoreFile {
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread,
boolean isCompaction, long readPt) {
+ // Increment the ref count
+ refCount.incrementAndGet();
return new StoreFileScanner(this,
getScanner(cacheBlocks, pread, isCompaction),
!isCompaction, reader.hasMVCCInfo(), readPt);
}
+ /**
+ * Decrement the ref count associated with the reader when ever a scanner associated
+ * with the reader is closed
+ */
+ void decrementRefCount() {
+ refCount.decrementAndGet();
+ }
+
+ /**
+ * @return true if the file is still used in reads
+ */
+ public boolean isReferencedInReads() {
+ return refCount.get() != 0;
+ }
+
+ /**
+ * @return true if the file is compacted
+ */
+ public boolean isCompactedAway() {
+ return this.compactedAway;
+ }
+
/**
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
@@ -1620,7 +1679,13 @@ public class StoreFile {
private static class GetFileSize implements Function {
@Override
public Long apply(StoreFile sf) {
- return sf.getReader().length();
+ if (sf.getReader() != null) {
+ return sf.getReader().length();
+ } else {
+ // the reader may be null for the compacted files and if the archiving
+ // had failed.
+ return -1L;
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 11993db919b..7e7054766a4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -53,19 +53,33 @@ public interface StoreFileManager {
void insertNewFiles(Collection sfs) throws IOException;
/**
- * Adds compaction results into the structure.
+ * Adds only the new compaction results into the structure.
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
*/
void addCompactionResults(
Collection compactedFiles, Collection results) throws IOException;
+ /**
+ * Remove the compacted files
+ * @param compactedFiles the list of compacted files
+ * @throws IOException
+ */
+ void removeCompactedFiles(Collection compactedFiles) throws IOException;
+
/**
* Clears all the files currently in use and returns them.
* @return The files previously in use.
*/
ImmutableCollection clearFiles();
+ /**
+ * Clears all the compacted files and returns them. This method is expected to be
+ * accessed single threaded.
+ * @return The files compacted previously.
+ */
+ Collection clearCompactedFiles();
+
/**
* Gets the snapshot of the store files currently in use. Can be used for things like metrics
* and checks; should not assume anything about relations between store files in the list.
@@ -73,6 +87,15 @@ public interface StoreFileManager {
*/
Collection getStorefiles();
+ /**
+ * List of compacted files inside this store that needs to be excluded in reads
+ * because further new reads will be using only the newly created files out of compaction.
+ * These compacted files will be deleted/cleared once all the existing readers on these
+ * compacted files are done.
+ * @return the list of compacted files
+ */
+ Collection getCompactedfiles();
+
/**
* Returns the number of files currently in use.
* @return The number of files.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 1d2f7e5ac3f..40551889fbc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -51,6 +51,7 @@ public class StoreFileScanner implements KeyValueScanner {
private final StoreFile.Reader reader;
private final HFileScanner hfs;
private Cell cur = null;
+ private boolean closed = false;
private boolean realSeekDone;
private boolean delayedReseek;
@@ -171,7 +172,7 @@ public class StoreFileScanner implements KeyValueScanner {
try {
try {
if(!seekAtOrAfter(hfs, key)) {
- close();
+ this.cur = null;
return false;
}
@@ -198,7 +199,7 @@ public class StoreFileScanner implements KeyValueScanner {
try {
try {
if (!reseekAtOrAfter(hfs, key)) {
- close();
+ this.cur = null;
return false;
}
setCurrentCell(hfs.getKeyValue());
@@ -244,7 +245,6 @@ public class StoreFileScanner implements KeyValueScanner {
}
if (cur == null) {
- close();
return false;
}
@@ -252,8 +252,12 @@ public class StoreFileScanner implements KeyValueScanner {
}
public void close() {
- // Nothing to close on HFileScanner?
cur = null;
+ if (closed) return;
+ if (this.reader != null) {
+ this.reader.decrementRefCount();
+ }
+ closed = true;
}
/**
@@ -454,7 +458,7 @@ public class StoreFileScanner implements KeyValueScanner {
if (seekCount != null) seekCount.incrementAndGet();
if (!hfs.seekBefore(seekKey.getBuffer(), seekKey.getKeyOffset(),
seekKey.getKeyLength())) {
- close();
+ this.cur = null;
return false;
}
KeyValue firstKeyOfPreviousRow = KeyValueUtil.createFirstOnRow(hfs.getKeyValue()
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index a026a407da5..ddf0721d652 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -121,7 +121,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// A flag whether use pread for scan
private boolean scanUsePread = false;
- protected ReentrantLock lock = new ReentrantLock();
+ // Indicates whether there was flush during the course of the scan
+ private volatile boolean flushed = false;
+ // generally we get one file from a flush
+ private List flushedStoreFiles = new ArrayList(1);
+ // The current list of scanners
+ private List currentScanners = new ArrayList();
+ // flush update lock
+ private ReentrantLock flushLock = new ReentrantLock();
private final long readPt;
@@ -166,6 +173,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
}
+ protected void addCurrentScanners(List extends KeyValueScanner> scanners) {
+ this.currentScanners.addAll(scanners);
+ }
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
* are not in a compaction.
@@ -203,7 +213,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// set rowOffset
this.storeOffset = scan.getRowOffsetPerColumnFamily();
-
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@@ -260,7 +270,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
-
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(scanners, store.getComparator());
}
@@ -299,6 +309,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Seek all scanners to the initial key
seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
+ addCurrentScanners(scanners);
resetKVHeap(scanners, scanInfo.getComparator());
}
@@ -392,6 +403,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (kvs.shouldUseScanner(scan, store, expiredTimestampCutoff)) {
scanners.add(kvs);
+ } else {
+ kvs.close();
}
}
return scanners;
@@ -399,15 +412,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
- lock.lock();
- try {
if (this.heap == null) {
return this.lastTop;
}
return this.heap.peek();
- } finally {
- lock.unlock();
- }
}
@Override
@@ -418,8 +426,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public void close() {
- lock.lock();
- try {
if (this.closing) return;
this.closing = true;
// Under test, we dont have a this.store
@@ -429,21 +435,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap.close();
this.heap = null; // CLOSED!
this.lastTop = null; // If both are null, we are closed.
- } finally {
- lock.unlock();
- }
}
@Override
public boolean seek(Cell key) throws IOException {
- lock.lock();
- try {
+ boolean flushed = checkFlushed();
// reset matcher state, in case that underlying store changed
- checkReseek();
+ checkReseek(flushed);
return this.heap.seek(key);
- } finally {
- lock.unlock();
- }
}
@Override
@@ -459,13 +458,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
@Override
public boolean next(List outResult, ScannerContext scannerContext) throws IOException {
- lock.lock();
-
- try {
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
- if (checkReseek()) {
+ boolean flushed = checkFlushed();
+ if (checkReseek(flushed)) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
@@ -643,9 +640,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// No more keys
close();
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
- } finally {
- lock.unlock();
- }
}
/*
@@ -682,9 +676,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Implementation of ChangedReadersObserver
@Override
- public void updateReaders() throws IOException {
- lock.lock();
+ public void updateReaders(List sfs) throws IOException {
+ flushed = true;
+ flushLock.lock();
try {
+ flushedStoreFiles.addAll(sfs);
+ } finally {
+ flushLock.unlock();
+ }
+ }
+
+ // Implementation of ChangedReadersObserver
+ protected void nullifyCurrentHeap() throws IOException {
if (this.closing) return;
// All public synchronized API calls will call 'checkReseek' which will cause
@@ -695,7 +698,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (this.heap == null) return;
// this could be null.
- this.lastTop = this.peek();
+ this.lastTop = this.heap.peek();
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
@@ -704,18 +707,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking
- } finally {
- lock.unlock();
- }
}
/**
+ * @param flushed indicates if there was a flush
* @return true if top of heap has changed (and KeyValueHeap has to try the
* next KV)
* @throws IOException
*/
- protected boolean checkReseek() throws IOException {
- if (this.heap == null && this.lastTop != null) {
+ protected boolean checkReseek(boolean flushed) throws IOException {
+ if (flushed && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
@@ -731,21 +732,37 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
protected void resetScannerStack(Cell lastTopKey) throws IOException {
- if (heap != null) {
- throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
- }
-
/* When we have the scan object, should we not pass it to getScanners()
* to get a limited set of scanners? We did so in the constructor and we
- * could have done it now by storing the scan object from the constructor */
- List scanners = getScannersNoCompaction();
+ * could have done it now by storing the scan object from the constructor
+ */
- // Seek all scanners to the initial key
+ final boolean isCompaction = false;
+ boolean usePread = get || scanUsePread;
+ List scanners = null;
+ try {
+ flushLock.lock();
+ scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread,
+ isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
+ // Clear the current set of flushed store files so that they don't get added again
+ flushedStoreFiles.clear();
+ } finally {
+ flushLock.unlock();
+ }
+
+ // Seek the new scanners to the last key
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
-
+ // remove the older memstore scanner
+ for (int i = 0; i < currentScanners.size(); i++) {
+ if (!currentScanners.get(i).isFileScanner()) {
+ currentScanners.remove(i);
+ break;
+ }
+ }
+ // add the newly created scanners on the flushed files and the current active memstore scanner
+ addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
- resetKVHeap(scanners, store.getComparator());
-
+ resetKVHeap(this.currentScanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
@@ -796,19 +813,36 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
- lock.lock();
- try {
- //Heap will not be null, if this is called from next() which.
- //If called from RegionScanner.reseek(...) make sure the scanner
- //stack is reset if needed.
- checkReseek();
+ boolean flushed = checkFlushed();
+ // Heap will not be null, if this is called from next() which.
+ // If called from RegionScanner.reseek(...) make sure the scanner
+ // stack is reset if needed.
+ checkReseek(flushed);
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
- } finally {
- lock.unlock();
+ }
+
+ protected boolean checkFlushed() {
+ // check the var without any lock. Suppose even if we see the old
+ // value here still it is ok to continue because we will not be resetting
+ // the heap but will continue with the referenced memstore's snapshot. For compactions
+ // any way we don't need the updateReaders at all to happen as we still continue with
+ // the older files
+ if (flushed) {
+ // If there is a flush and the current scan is notified on the flush ensure that the
+ // scan's heap gets reset and we do a seek on the newly flushed file.
+ if(!this.closing) {
+ this.lastTop = this.peek();
+ } else {
+ return false;
+ }
+ // reset the flag
+ flushed = false;
+ return true;
}
+ return false;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index 1bbaefe229e..5b797909b64 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -104,6 +104,7 @@ public class StripeStoreFileManager
/** Cached list of all files in the structure, to return from some calls */
public ImmutableList allFilesCached = ImmutableList.of();
+ private ImmutableList allCompactedFilesCached = ImmutableList.of();
}
private State state = null;
@@ -138,9 +139,15 @@ public class StripeStoreFileManager
return state.allFilesCached;
}
+ @Override
+ public Collection getCompactedfiles() {
+ return state.allCompactedFilesCached;
+ }
+
@Override
public void insertNewFiles(Collection sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
+ // Passing null does not cause NPE??
cmc.mergeResults(null, sfs);
debugDumpState("Added new files");
}
@@ -154,6 +161,13 @@ public class StripeStoreFileManager
return result;
}
+ @Override
+ public ImmutableCollection clearCompactedFiles() {
+ ImmutableCollection result = state.allCompactedFilesCached;
+ this.state = new State();
+ return result;
+ }
+
@Override
public int getStorefileCount() {
return state.allFilesCached.size();
@@ -304,9 +318,31 @@ public class StripeStoreFileManager
// copies and apply the result at the end.
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.mergeResults(compactedFiles, results);
+ markCompactedAway(compactedFiles);
debugDumpState("Merged compaction results");
}
+ // Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
+ // Let a background thread close the actual reader on these compacted files and also
+ // ensure to evict the blocks from block cache so that they are no longer in
+ // cache
+ private void markCompactedAway(Collection compactedFiles) {
+ for (StoreFile file : compactedFiles) {
+ file.markCompactedAway();
+ }
+ }
+
+ @Override
+ public void removeCompactedFiles(Collection compactedFiles) throws IOException {
+ // See class comment for the assumptions we make here.
+ LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
+ // In order to be able to fail in the middle of the operation, we'll operate on lazy
+ // copies and apply the result at the end.
+ CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
+ cmc.deleteResults(compactedFiles);
+ debugDumpState("Deleted compaction results");
+ }
+
@Override
public int getStoreCompactionPriority() {
// If there's only L0, do what the default store does.
@@ -660,7 +696,7 @@ public class StripeStoreFileManager
this.isFlush = isFlush;
}
- public void mergeResults(Collection compactedFiles, Collection results)
+ private void mergeResults(Collection compactedFiles, Collection results)
throws IOException {
assert this.compactedFiles == null && this.results == null;
this.compactedFiles = compactedFiles;
@@ -672,12 +708,20 @@ public class StripeStoreFileManager
processNewCandidateStripes(newStripes);
}
// Create new state and update parent.
- State state = createNewState();
+ State state = createNewState(false);
StripeStoreFileManager.this.state = state;
updateMetadataMaps();
}
- private State createNewState() {
+ private void deleteResults(Collection compactedFiles) throws IOException {
+ this.compactedFiles = compactedFiles;
+ // Create new state and update parent.
+ State state = createNewState(true);
+ StripeStoreFileManager.this.state = state;
+ updateMetadataMaps();
+ }
+
+ private State createNewState(boolean delCompactedFiles) {
State oldState = StripeStoreFileManager.this.state;
// Stripe count should be the same unless the end rows changed.
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
@@ -693,9 +737,21 @@ public class StripeStoreFileManager
}
List newAllFiles = new ArrayList(oldState.allFilesCached);
- if (!isFlush) newAllFiles.removeAll(compactedFiles);
- newAllFiles.addAll(results);
+ List newAllCompactedFiles =
+ new ArrayList(oldState.allCompactedFilesCached);
+ if (!isFlush) {
+ newAllFiles.removeAll(compactedFiles);
+ if (delCompactedFiles) {
+ newAllCompactedFiles.removeAll(compactedFiles);
+ } else {
+ newAllCompactedFiles.addAll(compactedFiles);
+ }
+ }
+ if (results != null) {
+ newAllFiles.addAll(results);
+ }
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
+ newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
return newState;
}
@@ -946,14 +1002,16 @@ public class StripeStoreFileManager
// Order by seqnum is reversed.
for (int i = 1; i < stripe.size(); ++i) {
StoreFile sf = stripe.get(i);
- long fileTs = sf.getReader().getMaxTimestamp();
- if (fileTs < maxTs && !filesCompacting.contains(sf)) {
- LOG.info("Found an expired store file: " + sf.getPath()
- + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
- if (expiredStoreFiles == null) {
- expiredStoreFiles = new ArrayList();
+ synchronized (sf) {
+ long fileTs = sf.getReader().getMaxTimestamp();
+ if (fileTs < maxTs && !filesCompacting.contains(sf)) {
+ LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is "
+ + fileTs + ", which is below " + maxTs);
+ if (expiredStoreFiles == null) {
+ expiredStoreFiles = new ArrayList();
+ }
+ expiredStoreFiles.add(sf);
}
- expiredStoreFiles.add(sf);
}
}
return expiredStoreFiles;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 62e7c7c4857..3d62c676d86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -55,6 +55,8 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY =
"hbase.hstore.compaction.min.size";
public static final String HBASE_HSTORE_COMPACTION_MAX_KEY = "hbase.hstore.compaction.max";
+ public static final String HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT =
+ "hbase.hstore.compaction.discharger.thread.count";
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY =
"hbase.hstore.compaction.max.size";
public static final String HBASE_HSTORE_COMPACTION_MAX_SIZE_OFFPEAK_KEY =
@@ -64,6 +66,11 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
"hbase.hstore.min.locality.to.skip.major.compact";
+ public static final String HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT =
+ "hbase.hfile.compaction.discharger.thread.count";
+ public static final String HBASE_HFILE_COMPACTION_DISCHARGER_INTERVAL =
+ "hbase.hfile.compaction.discharger.interval";
+
Configuration conf;
StoreConfigInformation storeConfigInfo;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 8802ea6dc44..6f476f05493 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -108,6 +108,11 @@ public class MockRegionServerServices implements RegionServerServices {
return null;
}
+ @Override
+ public List getOnlineRegions() {
+ return null;
+ }
+
@Override
public void addToOnlineRegions(Region r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 99e364afde4..e592536107e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -196,17 +196,6 @@ public class TestIOFencing {
r = (CompactionBlockerRegion) region;
}
- @Override
- protected void completeCompaction(final Collection compactedFiles,
- boolean removeFiles) throws IOException {
- try {
- r.compactionsWaiting.countDown();
- r.compactionsBlocked.await();
- } catch (InterruptedException ex) {
- throw new IOException(ex);
- }
- super.completeCompaction(compactedFiles, removeFiles);
- }
@Override
protected void completeCompaction(Collection compactedFiles) throws IOException {
try {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
index e1304cf5d5c..a8713e73b43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.backup.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -43,7 +45,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -76,6 +81,7 @@ public class TestZooKeeperTableArchiveClient {
private static ZKTableArchiveClient archivingClient;
private final List toCleanup = new ArrayList();
private static ClusterConnection CONNECTION;
+ private static RegionServerServices rss;
/**
* Setup the config for the cluster
@@ -90,6 +96,7 @@ public class TestZooKeeperTableArchiveClient {
ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
+ rss = mock(RegionServerServices.class);
}
private static void setupConf(Configuration conf) {
@@ -169,10 +176,14 @@ public class TestZooKeeperTableArchiveClient {
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
- Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
-
+ HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ final CompactedHFilesDischarger compactionCleaner =
+ new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
-
+ compactionCleaner.chore();
// get the current hfiles in the archive directory
List files = getAllFiles(fs, archiveDir);
if (files == null) {
@@ -216,18 +227,28 @@ public class TestZooKeeperTableArchiveClient {
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
-
// create the region
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
- Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ final CompactedHFilesDischarger compactionCleaner =
+ new CompactedHFilesDischarger(100, stop, rss, false);
loadFlushAndCompact(region, TEST_FAM);
-
+ compactionCleaner.chore();
// create the another table that we don't archive
hcd = new HColumnDescriptor(TEST_FAM);
- Region otherRegion = UTIL.createTestRegion(otherTable, hcd);
+ HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
+ regions = new ArrayList();
+ regions.add(otherRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ final CompactedHFilesDischarger compactionCleaner1 = new CompactedHFilesDischarger(100, stop,
+ rss, false);
loadFlushAndCompact(otherRegion, TEST_FAM);
-
+ compactionCleaner1.chore();
// get the current hfiles in the archive directory
+ // Should be archived
List files = getAllFiles(fs, archiveDir);
if (files == null) {
FSUtils.logFileSystemState(fs, archiveDir, LOG);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 13fcb242cc3..07ca2b911fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -74,7 +74,7 @@ public class TestHeapSize {
LOG.info("name=" + b.getName());
LOG.info("specname=" + b.getSpecName());
LOG.info("specvendor=" + b.getSpecVendor());
- LOG.info("vmname=" + b.getVmName());
+ LOG.info("vmname=" + b.getVmName());
LOG.info("vmversion=" + b.getVmVersion());
LOG.info("vmvendor=" + b.getVmVendor());
Map p = b.getSystemProperties();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
index b4bae992ea3..4c8bdc2ef4f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -71,7 +72,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -1032,6 +1035,12 @@ public class TestHFileOutputFormat {
try {
quickPoll(new Callable() {
public Boolean call() throws Exception {
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
@@ -1044,6 +1053,12 @@ public class TestHFileOutputFormat {
admin.majorCompact(TABLE_NAME.getName());
quickPoll(new Callable() {
public Boolean call() throws Exception {
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index bb1a073e5f5..bc3da474fde 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -94,6 +96,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
+import com.google.common.collect.Lists;
+
/**
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
* Sets up and runs a mapreduce job that writes hfile output.
@@ -996,6 +1000,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable() {
@Override
public Boolean call() throws Exception {
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
@@ -1009,6 +1019,12 @@ public class TestHFileOutputFormat2 {
quickPoll(new Callable() {
@Override
public Boolean call() throws Exception {
+ List regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ for (Store store : region.getStores()) {
+ store.closeAndArchiveCompactedFiles();
+ }
+ }
return fs.listStatus(storePath).length == 1;
}
}, 5000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 8aa38d62e51..e2278663237 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -460,6 +460,11 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
return null;
}
+ @Override
+ public List getOnlineRegions() {
+ return null;
+ }
+
@Override
public OpenRegionResponse openRegion(RpcController controller,
OpenRegionRequest request) throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
index 08bafdc3f4b..42be207efd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestSnapshotFromMaster.java
@@ -47,8 +47,10 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnaps
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -120,6 +123,7 @@ public class TestSnapshotFromMaster {
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
ConstantSizeRegionSplitPolicy.class.getName());
+ conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
}
@@ -320,6 +324,17 @@ public class TestSnapshotFromMaster {
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
region.compactStores(); // min is 2 so will compact and archive
}
+ List regionServerThreads = UTIL.getMiniHBaseCluster()
+ .getRegionServerThreads();
+ HRegionServer hrs = null;
+ for (RegionServerThread rs : regionServerThreads) {
+ if (!rs.getRegionServer().getOnlineRegions(TABLE_NAME).isEmpty()) {
+ hrs = rs.getRegionServer();
+ break;
+ }
+ }
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, hrs, false);
+ cleaner.chore();
LOG.info("After compaction File-System state");
FSUtils.logFileSystemState(fs, rootDir, LOG);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
index 3a12674b716..76e496de485 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java
@@ -78,6 +78,11 @@ public class MockStoreFile extends StoreFile {
return false;
}
+ @Override
+ public boolean isCompactedAway() {
+ return false;
+ }
+
@Override
public byte[] getMetadataValue(byte[] key) {
return this.metadata.get(key);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
index 0b4cbd21d3a..4ee765f1ba6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEncryptionKeyRotation.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
+import java.io.IOException;
import java.security.Key;
import java.security.SecureRandom;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import javax.crypto.spec.SecretKeySpec;
@@ -122,13 +124,14 @@ public class TestEncryptionKeyRotation {
// And major compact
TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
+ final List updatePaths = findCompactedStorefilePaths(htd.getTableName());
TEST_UTIL.waitFor(30000, 1000, true, new Predicate() {
@Override
public boolean evaluate() throws Exception {
// When compaction has finished, all of the original files will be
// gone
boolean found = false;
- for (Path path: initialPaths) {
+ for (Path path: updatePaths) {
found = TEST_UTIL.getTestFileSystem().exists(path);
if (found) {
LOG.info("Found " + path);
@@ -140,14 +143,20 @@ public class TestEncryptionKeyRotation {
});
// Verify we have store file(s) with only the new key
+ Thread.sleep(1000);
+ waitForCompaction(htd.getTableName());
List pathsAfterCompaction = findStorefilePaths(htd.getTableName());
assertTrue(pathsAfterCompaction.size() > 0);
for (Path path: pathsAfterCompaction) {
- assertFalse("Store file " + path + " retains initial key",
- Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
assertTrue("Store file " + path + " has incorrect key",
Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
}
+ List compactedPaths = findCompactedStorefilePaths(htd.getTableName());
+ assertTrue(compactedPaths.size() > 0);
+ for (Path path: compactedPaths) {
+ assertTrue("Store file " + path + " retains initial key",
+ Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
+ }
}
@Test
@@ -193,6 +202,33 @@ public class TestEncryptionKeyRotation {
}
}
+ private static void waitForCompaction(TableName tableName)
+ throws IOException, InterruptedException {
+ boolean compacted = false;
+ for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
+ .getOnlineRegions(tableName)) {
+ for (Store store : region.getStores()) {
+ compacted = false;
+ while (!compacted) {
+ if (store.getStorefiles() != null) {
+ while (store.getStorefilesCount() != 1) {
+ Thread.sleep(100);
+ }
+ for (StoreFile storefile : store.getStorefiles()) {
+ if (!storefile.isCompactedAway()) {
+ compacted = true;
+ break;
+ }
+ Thread.sleep(100);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+ }
+
private static List findStorefilePaths(TableName tableName) throws Exception {
List paths = new ArrayList();
for (Region region:
@@ -206,6 +242,23 @@ public class TestEncryptionKeyRotation {
return paths;
}
+ private static List findCompactedStorefilePaths(TableName tableName) throws Exception {
+ List paths = new ArrayList();
+ for (Region region:
+ TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
+ for (Store store : region.getStores()) {
+ Collection compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ if (compactedfiles != null) {
+ for (StoreFile storefile : compactedfiles) {
+ paths.add(storefile.getPath());
+ }
+ }
+ }
+ }
+ return paths;
+ }
+
private void createTableAndFlush(HTableDescriptor htd) throws Exception {
HColumnDescriptor hcd = htd.getFamilies().iterator().next();
// Create the test table
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 78b5b314e7f..851f5ed4227 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -166,9 +167,17 @@ public class TestHRegionReplayEvents {
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
when(rss.getConfiguration()).thenReturn(CONF);
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting());
-
+ String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
+ .toString();
+ ExecutorService es = new ExecutorService(string);
+ es.startExecutorService(
+ string+"-"+string, 1);
+ when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close();
+ List regions = new ArrayList();
+ regions.add(primaryRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null);
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, null, CONF, rss, null);
@@ -1369,6 +1378,11 @@ public class TestHRegionReplayEvents {
// Test case 3: compact primary files
primaryRegion.compactStores();
+ List regions = new ArrayList();
+ regions.add(primaryRegion);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, rss, false);
+ cleaner.chore();
secondaryRegion.refreshStoreFiles();
assertPathListsEqual(primaryRegion.getStoreFileList(families),
secondaryRegion.getStoreFileList(families));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 0ae5851fe2a..3eb0a558c61 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.util.StringUtils;
@@ -180,7 +182,7 @@ public class TestRegionMergeTransactionOnCluster {
List> tableRegions = MetaTableAccessor
.getTableRegionsAndLocations(master.getZooKeeper(), master.getConnection(), tableName);
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
- HTableDescriptor tableDescritor = master.getTableDescriptors().get(
+ HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
tableName);
Result mergedRegionResult = MetaTableAccessor.getRegionResult(
master.getConnection(), mergedRegionInfo.getRegionName());
@@ -205,19 +207,46 @@ public class TestRegionMergeTransactionOnCluster {
assertTrue(fs.exists(regionAdir));
assertTrue(fs.exists(regionBdir));
- admin.compactRegion(mergedRegionInfo.getRegionName());
- // wait until merged region doesn't have reference file
- long timeout = System.currentTimeMillis() + waitTime;
+ HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
HRegionFileSystem hrfs = new HRegionFileSystem(
- TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+ TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
+ int count = 0;
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ count += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ admin.compactRegion(mergedRegionInfo.getRegionName());
+ // clean up the merged region store files
+ // wait until merged region have reference file
+ long timeout = System.currentTimeMillis() + waitTime;
+ int newcount = 0;
while (System.currentTimeMillis() < timeout) {
- if (!hrfs.hasReferences(tableDescritor)) {
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ if(newcount > count) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ assertTrue(newcount > count);
+ List regionServerThreads = TEST_UTIL.getHBaseCluster()
+ .getRegionServerThreads();
+ for (RegionServerThread rs : regionServerThreads) {
+ CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
+ rs.getRegionServer(), false);
+ cleaner.chore();
+ Thread.sleep(1000);
+ }
+ int newcount1 = 0;
+ while (System.currentTimeMillis() < timeout) {
+ for(HColumnDescriptor colFamily : columnFamilies) {
+ newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+ }
+ if(newcount1 <= 1) {
break;
}
Thread.sleep(50);
}
- assertFalse(hrfs.hasReferences(tableDescritor));
-
// run CatalogJanitor to clean merge references in hbase:meta and archive the
// files of merging regions
int cleaned = admin.runCatalogScan();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
index 654135ebd4d..6693ca5d1c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.*;
import java.io.IOException;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -254,6 +256,7 @@ public class TestRegionReplicas {
LOG.info("Flushing primary region");
Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
region.flush(true);
+ HRegion primaryRegion = (HRegion) region;
// ensure that chore is run
LOG.info("Sleeping for " + (4 * refreshPeriod));
@@ -283,7 +286,7 @@ public class TestRegionReplicas {
assertGetRpc(hriSecondary, 1042, true);
assertGetRpc(hriSecondary, 2042, true);
- // ensure that we are see the 3 store files
+ // ensure that we see the 3 store files
Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
// force compaction
@@ -298,7 +301,8 @@ public class TestRegionReplicas {
}
// ensure that we see the compacted file only
- Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
+ // This will be 4 until the cleaner chore runs
+ Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
} finally {
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
@@ -457,7 +461,19 @@ public class TestRegionReplicas {
LOG.info("Force Major compaction on primary region " + hriPrimary);
primaryRegion.compact(true);
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
-
+ List regionServerThreads = HTU.getMiniHBaseCluster()
+ .getRegionServerThreads();
+ HRegionServer hrs = null;
+ for (RegionServerThread rs : regionServerThreads) {
+ if (rs.getRegionServer()
+ .getOnlineRegion(primaryRegion.getRegionInfo().getRegionName()) != null) {
+ hrs = rs.getRegionServer();
+ break;
+ }
+ }
+ CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(100, null, hrs, false);
+ cleaner.chore();
// scan all the hfiles on the secondary.
// since there are no read on the secondary when we ask locations to
// the NN a FileNotFound exception will be returned and the FileLink
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 6c3f946c409..72899a3a9ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -1027,6 +1027,18 @@ public class TestStore {
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
}
+ private void closeCompactedFile(int index) throws IOException {
+ Collection files =
+ this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
+ StoreFile sf = null;
+ Iterator it = files.iterator();
+ for (int i = 0; i <= index; i++) {
+ sf = it.next();
+ }
+ sf.closeReader(true);
+ store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
+ }
+
@Test
public void testRefreshStoreFiles() throws Exception {
init(name.getMethodName());
@@ -1054,6 +1066,7 @@ public class TestStore {
store.refreshStoreFiles();
assertEquals(5, this.store.getStorefilesCount());
+ closeCompactedFile(0);
archiveStoreFile(0);
assertEquals(5, this.store.getStorefilesCount());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index f59524f36e9..ba4ad3c2783 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -452,9 +452,9 @@ public class TestStoreScanner extends TestCase {
// normally cause an NPE because scan.store is null. So as long as we get through these
// two calls we are good and the bug was quashed.
- scan.updateReaders();
+ scan.updateReaders(new ArrayList());
- scan.updateReaders();
+ scan.updateReaders(new ArrayList());
scan.peek();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
index 36a726d06f4..a3d563166ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
@@ -137,6 +137,7 @@ public class TestStripeStoreFileManager {
MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
stripe1 = createFile(KEY_B, OPEN_KEY);
manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
+ manager.removeCompactedFiles(al(l0File));
// If we want a key <= KEY_A, we should get everything except stripe1.
ArrayList sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
assertEquals(2, sfsDump.size());
@@ -162,6 +163,7 @@ public class TestStripeStoreFileManager {
// a candidate from the first file, the old one should not be removed.
StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
manager.addCompactionResults(al(l0File2), al(stripe0b));
+ manager.removeCompactedFiles(al(l0File2));
sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
assertEquals(stripe0b, sfs.next());
sfs.remove();
@@ -350,10 +352,12 @@ public class TestStripeStoreFileManager {
// Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
+ manager.removeCompactedFiles(al(sf));
// Do the same for other variants.
manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
+ manager.removeCompactedFiles(al(sf));
manager = createManager(al(sf));
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
@@ -379,6 +383,7 @@ public class TestStripeStoreFileManager {
StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
+ manager.removeCompactedFiles(al(sf_L0_0a));
verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
// Add another l0 file, "compact" both L0 into two stripes
@@ -387,51 +392,61 @@ public class TestStripeStoreFileManager {
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
manager.insertNewFiles(al(sf_L0_1));
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
+ manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1));
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
// Try compacting with invalid file (no metadata) - should add files to L0.
StoreFile sf_L0_2 = createFile(null, null);
manager.addCompactionResults(al(), al(sf_L0_2));
+ manager.removeCompactedFiles(al());
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
// Remove it...
manager.addCompactionResults(al(sf_L0_2), al());
+ manager.removeCompactedFiles(al(sf_L0_2));
// Do regular compaction in the first stripe.
StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
+ manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1));
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
// Rebalance two stripes.
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
+ manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1));
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
// Split the first stripe.
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
+ manager.removeCompactedFiles(al(sf_i2B_3));
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
// Split the middle stripe.
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
+ manager.removeCompactedFiles(al(sf_B2D_4));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
// Merge two different middle stripes.
StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
+ manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
// Merge lower half.
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
+ manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7));
verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
// Merge all.
StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
+ manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
verifyAllFiles(manager, al(sf_i2i_9));
}
@@ -451,12 +466,14 @@ public class TestStripeStoreFileManager {
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
// Remove these files.
sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
+ sfm.removeCompactedFiles(al(sf_i2d, sf_d2i));
assertEquals(0, sfm.getLevel0Files().size());
// Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
// presumably flushed during compaction, should go to L0.
StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
sfm.insertNewFiles(al(sf_i2c_2));
sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
+ sfm.removeCompactedFiles(al(sf_i2c, sf_c2i));
assertEquals(1, sfm.getLevel0Files().size());
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
}
@@ -472,9 +489,11 @@ public class TestStripeStoreFileManager {
ArrayList compacted = al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0a), compacted);
+ manager.removeCompactedFiles(al(sf0a));
// Next L0 compaction only produces file for the first and last stripe.
ArrayList compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0b), compacted2);
+ manager.removeCompactedFiles(al(sf0b));
compacted.addAll(compacted2);
verifyAllFiles(manager, compacted);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index 5a451909390..7e866323e40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -125,7 +125,7 @@ public class TestWideScanner extends HBaseTestCase {
((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
while (scanners.hasNext()) {
StoreScanner ss = (StoreScanner)scanners.next();
- ss.updateReaders();
+ ss.updateReaders(new ArrayList());
}
} while (more);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
new file mode 100644
index 00000000000..c23e7942578
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, RegionServerTests.class })
+public class TestCompactedHFilesDischarger {
+ private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
+ private Region region;
+ private final static byte[] fam = Bytes.toBytes("cf_1");
+ private final static byte[] qual1 = Bytes.toBytes("qf_1");
+ private final static byte[] val = Bytes.toBytes("val");
+ private static CountDownLatch latch = new CountDownLatch(3);
+ private static AtomicInteger counter = new AtomicInteger(0);
+ private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
+ private RegionServerServices rss;
+
+ @Before
+ public void setUp() throws Exception {
+ TableName tableName = TableName.valueOf(getClass().getSimpleName());
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(fam));
+ HRegionInfo info = new HRegionInfo(tableName, null, null, false);
+ Path path = testUtil.getDataTestDir(getClass().getSimpleName());
+ region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
+ rss = mock(RegionServerServices.class);
+ List regions = new ArrayList();
+ regions.add(region);
+ when(rss.getOnlineRegions()).thenReturn(regions);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ counter.set(0);
+ scanCompletedCounter.set(0);
+ latch = new CountDownLatch(3);
+ HBaseTestingUtility.closeRegionAndWAL(region);
+ testUtil.cleanupTestDir();
+ }
+
+ @Test
+ public void testCompactedHFilesCleaner() throws Exception {
+ // Create the cleaner object
+ CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+ // Add some data to the region and do some flushes
+ for (int i = 1; i < 10; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 11; i < 20; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 21; i < 30; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+
+ Store store = region.getStore(fam);
+ assertEquals(3, store.getStorefilesCount());
+
+ Collection storefiles = store.getStorefiles();
+ Collection compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ // None of the files should be in compacted state.
+ for (StoreFile file : storefiles) {
+ assertFalse(file.isCompactedAway());
+ }
+ // Try to run the cleaner without compaction. there should not be any change
+ cleaner.chore();
+ storefiles = store.getStorefiles();
+ // None of the files should be in compacted state.
+ for (StoreFile file : storefiles) {
+ assertFalse(file.isCompactedAway());
+ }
+ // now do some compaction
+ region.compact(true);
+ // Still the flushed files should be present until the cleaner runs. But the state of it should
+ // be in COMPACTED state
+ assertEquals(1, store.getStorefilesCount());
+ assertEquals(3,
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+ // Run the cleaner
+ cleaner.chore();
+ assertEquals(1, store.getStorefilesCount());
+ storefiles = store.getStorefiles();
+ for (StoreFile file : storefiles) {
+ // Should not be in compacted state
+ assertFalse(file.isCompactedAway());
+ }
+ compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ assertTrue(compactedfiles.size() == 0);
+
+ }
+
+ @Test
+ public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
+ // Create the cleaner object
+ CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+ // Add some data to the region and do some flushes
+ for (int i = 1; i < 10; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 11; i < 20; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 21; i < 30; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+
+ Store store = region.getStore(fam);
+ assertEquals(3, store.getStorefilesCount());
+
+ Collection storefiles = store.getStorefiles();
+ Collection compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ // None of the files should be in compacted state.
+ for (StoreFile file : storefiles) {
+ assertFalse(file.isCompactedAway());
+ }
+ // Do compaction
+ region.compact(true);
+ startScannerThreads();
+
+ storefiles = store.getStorefiles();
+ int usedReaderCount = 0;
+ int unusedReaderCount = 0;
+ for (StoreFile file : storefiles) {
+ if (file.getRefCount() == 3) {
+ usedReaderCount++;
+ }
+ }
+ compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ for(StoreFile file : compactedfiles) {
+ assertEquals("Refcount should be 3", 0, file.getRefCount());
+ unusedReaderCount++;
+ }
+ // Though there are files we are not using them for reads
+ assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+ assertEquals("used reader count should be 1", 1, usedReaderCount);
+ // now run the cleaner
+ cleaner.chore();
+ countDown();
+ assertEquals(1, store.getStorefilesCount());
+ storefiles = store.getStorefiles();
+ for (StoreFile file : storefiles) {
+ // Should not be in compacted state
+ assertFalse(file.isCompactedAway());
+ }
+ compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ assertTrue(compactedfiles.size() == 0);
+ }
+
+ @Test
+ public void testCleanerWithParallelScanners() throws Exception {
+ // Create the cleaner object
+ CompactedHFilesDischarger cleaner =
+ new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false);
+ // Add some data to the region and do some flushes
+ for (int i = 1; i < 10; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 11; i < 20; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+ for (int i = 21; i < 30; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ region.put(p);
+ }
+ // flush them
+ region.flush(true);
+
+ Store store = region.getStore(fam);
+ assertEquals(3, store.getStorefilesCount());
+
+ Collection storefiles = store.getStorefiles();
+ Collection compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ // None of the files should be in compacted state.
+ for (StoreFile file : storefiles) {
+ assertFalse(file.isCompactedAway());
+ }
+ startScannerThreads();
+ // Do compaction
+ region.compact(true);
+
+ storefiles = store.getStorefiles();
+ int usedReaderCount = 0;
+ int unusedReaderCount = 0;
+ for (StoreFile file : storefiles) {
+ if (file.getRefCount() == 0) {
+ unusedReaderCount++;
+ }
+ }
+ compactedfiles =
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ for(StoreFile file : compactedfiles) {
+ assertEquals("Refcount should be 3", 3, file.getRefCount());
+ usedReaderCount++;
+ }
+ // The newly compacted file will not be used by any scanner
+ assertEquals("unused reader count should be 1", 1, unusedReaderCount);
+ assertEquals("used reader count should be 3", 3, usedReaderCount);
+ // now run the cleaner
+ cleaner.chore();
+ countDown();
+ // No change in the number of store files as none of the compacted files could be cleaned up
+ assertEquals(1, store.getStorefilesCount());
+ assertEquals(3,
+ ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+ while (scanCompletedCounter.get() != 3) {
+ Thread.sleep(100);
+ }
+ // reset
+ latch = new CountDownLatch(3);
+ scanCompletedCounter.set(0);
+ counter.set(0);
+ // Try creating a new scanner and it should use only the new file created after compaction
+ startScannerThreads();
+ storefiles = store.getStorefiles();
+ usedReaderCount = 0;
+ unusedReaderCount = 0;
+ for (StoreFile file : storefiles) {
+ if (file.getRefCount() == 3) {
+ usedReaderCount++;
+ }
+ }
+ compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ for(StoreFile file : compactedfiles) {
+ assertEquals("Refcount should be 0", 0, file.getRefCount());
+ unusedReaderCount++;
+ }
+ // Though there are files we are not using them for reads
+ assertEquals("unused reader count should be 3", 3, unusedReaderCount);
+ assertEquals("used reader count should be 1", 1, usedReaderCount);
+ countDown();
+ while (scanCompletedCounter.get() != 3) {
+ Thread.sleep(100);
+ }
+ // Run the cleaner again
+ cleaner.chore();
+ // Now the cleaner should be able to clear it up because there are no active readers
+ assertEquals(1, store.getStorefilesCount());
+ storefiles = store.getStorefiles();
+ for (StoreFile file : storefiles) {
+ // Should not be in compacted state
+ assertFalse(file.isCompactedAway());
+ }
+ compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
+ assertTrue(compactedfiles.size() == 0);
+ }
+
+ protected void countDown() {
+ // count down 3 times
+ latch.countDown();
+ latch.countDown();
+ latch.countDown();
+ }
+
+ protected void startScannerThreads() throws InterruptedException {
+ // Start parallel scan threads
+ ScanThread[] scanThreads = new ScanThread[3];
+ for (int i = 0; i < 3; i++) {
+ scanThreads[i] = new ScanThread((HRegion) region);
+ }
+ for (ScanThread thread : scanThreads) {
+ thread.start();
+ }
+ while (counter.get() != 3) {
+ Thread.sleep(100);
+ }
+ }
+
+ private static class ScanThread extends Thread {
+ private final HRegion region;
+
+ public ScanThread(HRegion region) {
+ this.region = region;
+ }
+
+ @Override
+ public void run() {
+ try {
+ initiateScan(region);
+ } catch (IOException e) {
+ // do nothing
+ }
+ }
+
+ private void initiateScan(HRegion region) throws IOException {
+ Scan scan = new Scan();
+ scan.setCaching(1);
+ RegionScanner resScanner = null;
+ try {
+ resScanner = region.getScanner(scan);
+ List results = new ArrayList();
+ boolean next = resScanner.next(results);
+ try {
+ counter.incrementAndGet();
+ latch.await();
+ } catch (InterruptedException e) {
+ }
+ while (!next) {
+ resScanner.next(results);
+ }
+ } finally {
+ scanCompletedCounter.incrementAndGet();
+ resScanner.close();
+ }
+ }
+ }
+}
| | |