HBASE-13082 Coarsen StoreScanner locks to RegionScanner (Ram)
This commit is contained in:
parent
b3260423b1
commit
8b3d1f1444
|
@ -54,6 +54,13 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
* is atomically replaced when its contents change.
|
* is atomically replaced when its contents change.
|
||||||
*/
|
*/
|
||||||
private volatile ImmutableList<StoreFile> storefiles = null;
|
private volatile ImmutableList<StoreFile> storefiles = null;
|
||||||
|
/**
|
||||||
|
* List of compacted files inside this store that needs to be excluded in reads
|
||||||
|
* because further new reads will be using only the newly created files out of compaction.
|
||||||
|
* These compacted files will be deleted/cleared once all the existing readers on these
|
||||||
|
* compacted files are done.
|
||||||
|
*/
|
||||||
|
private volatile List<StoreFile> compactedfiles = null;
|
||||||
|
|
||||||
public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf,
|
public DefaultStoreFileManager(CellComparator kvComparator, Configuration conf,
|
||||||
CompactionConfiguration comConf) {
|
CompactionConfiguration comConf) {
|
||||||
|
@ -74,6 +81,11 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
return storefiles;
|
return storefiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<StoreFile> getCompactedfiles() {
|
||||||
|
return compactedfiles;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
|
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
|
||||||
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
|
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
|
||||||
|
@ -88,6 +100,13 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<StoreFile> clearCompactedFiles() {
|
||||||
|
List<StoreFile> result = compactedfiles;
|
||||||
|
compactedfiles = new ArrayList<StoreFile>();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final int getStorefileCount() {
|
public final int getStorefileCount() {
|
||||||
return storefiles.size();
|
return storefiles.size();
|
||||||
|
@ -95,13 +114,42 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addCompactionResults(
|
public void addCompactionResults(
|
||||||
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) {
|
Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
|
||||||
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
|
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
|
||||||
newStoreFiles.removeAll(compactedFiles);
|
newStoreFiles.removeAll(newCompactedfiles);
|
||||||
if (!results.isEmpty()) {
|
if (!results.isEmpty()) {
|
||||||
newStoreFiles.addAll(results);
|
newStoreFiles.addAll(results);
|
||||||
}
|
}
|
||||||
sortAndSetStoreFiles(newStoreFiles);
|
sortAndSetStoreFiles(newStoreFiles);
|
||||||
|
ArrayList<StoreFile> updatedCompactedfiles = null;
|
||||||
|
if (this.compactedfiles != null) {
|
||||||
|
updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
|
||||||
|
updatedCompactedfiles.addAll(newCompactedfiles);
|
||||||
|
} else {
|
||||||
|
updatedCompactedfiles = new ArrayList<StoreFile>(newCompactedfiles);
|
||||||
|
}
|
||||||
|
markCompactedAway(newCompactedfiles);
|
||||||
|
this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
|
||||||
|
// Let a background thread close the actual reader on these compacted files and also
|
||||||
|
// ensure to evict the blocks from block cache so that they are no longer in
|
||||||
|
// cache
|
||||||
|
private void markCompactedAway(Collection<StoreFile> compactedFiles) {
|
||||||
|
for (StoreFile file : compactedFiles) {
|
||||||
|
file.markCompactedAway();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException {
|
||||||
|
ArrayList<StoreFile> updatedCompactedfiles = null;
|
||||||
|
if (this.compactedfiles != null) {
|
||||||
|
updatedCompactedfiles = new ArrayList<StoreFile>(this.compactedfiles);
|
||||||
|
updatedCompactedfiles.removeAll(removedCompactedfiles);
|
||||||
|
this.compactedfiles = sortCompactedfiles(updatedCompactedfiles);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -166,6 +214,12 @@ class DefaultStoreFileManager implements StoreFileManager {
|
||||||
storefiles = ImmutableList.copyOf(storeFiles);
|
storefiles = ImmutableList.copyOf(storeFiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) {
|
||||||
|
// Sorting may not be really needed here for the compacted files?
|
||||||
|
Collections.sort(storefiles, StoreFile.Comparators.SEQ_ID);
|
||||||
|
return new ArrayList<StoreFile>(storefiles);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double getCompactionPressure() {
|
public double getCompactionPressure() {
|
||||||
int storefileCount = getStorefileCount();
|
int storefileCount = getStorefileCount();
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||||
|
@ -148,6 +149,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
|
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
|
||||||
|
@ -297,6 +299,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
private final Configuration baseConf;
|
private final Configuration baseConf;
|
||||||
private final int rowLockWaitDuration;
|
private final int rowLockWaitDuration;
|
||||||
|
private CompactedHFilesDischarger compactedFileDischarger;
|
||||||
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
||||||
|
|
||||||
// The internal wait duration to acquire a lock before read/update
|
// The internal wait duration to acquire a lock before read/update
|
||||||
|
@ -809,6 +812,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// Initialize all the HStores
|
// Initialize all the HStores
|
||||||
status.setStatus("Initializing all the Stores");
|
status.setStatus("Initializing all the Stores");
|
||||||
long maxSeqId = initializeStores(reporter, status);
|
long maxSeqId = initializeStores(reporter, status);
|
||||||
|
// Start the CompactedHFilesDischarger here. This chore helps to remove the compacted files
|
||||||
|
// that will no longer be used in reads.
|
||||||
|
if (this.getRegionServerServices() != null) {
|
||||||
|
ChoreService choreService = this.getRegionServerServices().getChoreService();
|
||||||
|
if (choreService != null) {
|
||||||
|
// Default is 2 mins. The default value for TTLCleaner is 5 mins so we set this to
|
||||||
|
// 2 mins so that compacted files can be archived before the TTLCleaner runs
|
||||||
|
int cleanerInterval =
|
||||||
|
conf.getInt("hbase.hfile.compactions.cleaner.interval", 2 * 60 * 1000);
|
||||||
|
this.compactedFileDischarger =
|
||||||
|
new CompactedHFilesDischarger(cleanerInterval, this.getRegionServerServices(), this);
|
||||||
|
choreService.scheduleChore(compactedFileDischarger);
|
||||||
|
}
|
||||||
|
}
|
||||||
this.mvcc.advanceTo(maxSeqId);
|
this.mvcc.advanceTo(maxSeqId);
|
||||||
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
|
||||||
// Recover any edits if available.
|
// Recover any edits if available.
|
||||||
|
@ -1513,6 +1530,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (this.metricsRegionWrapper != null) {
|
if (this.metricsRegionWrapper != null) {
|
||||||
Closeables.closeQuietly(this.metricsRegionWrapper);
|
Closeables.closeQuietly(this.metricsRegionWrapper);
|
||||||
}
|
}
|
||||||
|
// stop the Compacted hfile discharger
|
||||||
|
if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
|
||||||
|
|
||||||
status.markComplete("Closed");
|
status.markComplete("Closed");
|
||||||
LOG.info("Closed " + this);
|
LOG.info("Closed " + this);
|
||||||
return result;
|
return result;
|
||||||
|
@ -6658,6 +6678,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
|
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// clear the compacted files if any
|
||||||
|
for (Store s : dstRegion.getStores()) {
|
||||||
|
s.closeAndArchiveCompactedFiles();
|
||||||
|
}
|
||||||
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
|
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
|
||||||
throw new IOException("Merged region " + dstRegion
|
throw new IOException("Merged region " + dstRegion
|
||||||
+ " still has references after the compaction, is compaction canceled?");
|
+ " still has references after the compaction, is compaction canceled?");
|
||||||
|
@ -7527,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||||
ClassSize.OBJECT +
|
ClassSize.OBJECT +
|
||||||
ClassSize.ARRAY +
|
ClassSize.ARRAY +
|
||||||
43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
|
||||||
(14 * Bytes.SIZEOF_LONG) +
|
(14 * Bytes.SIZEOF_LONG) +
|
||||||
5 * Bytes.SIZEOF_BOOLEAN);
|
5 * Bytes.SIZEOF_BOOLEAN);
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,9 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
|
@ -90,7 +93,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
|
||||||
|
|
||||||
|
@ -138,6 +141,8 @@ public class HStore implements Store {
|
||||||
static int closeCheckInterval = 0;
|
static int closeCheckInterval = 0;
|
||||||
private volatile long storeSize = 0L;
|
private volatile long storeSize = 0L;
|
||||||
private volatile long totalUncompressedBytes = 0L;
|
private volatile long totalUncompressedBytes = 0L;
|
||||||
|
private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
|
||||||
|
private CompletionService<StoreFile> completionService = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RWLock for store operations.
|
* RWLock for store operations.
|
||||||
|
@ -181,7 +186,6 @@ public class HStore implements Store {
|
||||||
|
|
||||||
private long blockingFileCount;
|
private long blockingFileCount;
|
||||||
private int compactionCheckMultiplier;
|
private int compactionCheckMultiplier;
|
||||||
|
|
||||||
protected Encryption.Context cryptoContext = Encryption.Context.NONE;
|
protected Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||||
|
|
||||||
private volatile long flushedCellsCount = 0;
|
private volatile long flushedCellsCount = 0;
|
||||||
|
@ -272,7 +276,10 @@ public class HStore implements Store {
|
||||||
"hbase.hstore.flush.retries.number must be > 0, not "
|
"hbase.hstore.flush.retries.number must be > 0, not "
|
||||||
+ flushRetriesNumber);
|
+ flushRetriesNumber);
|
||||||
}
|
}
|
||||||
|
compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
|
||||||
|
conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
|
||||||
|
completionService =
|
||||||
|
new ExecutorCompletionService<StoreFile>(compactionCleanerthreadPoolExecutor);
|
||||||
// Crypto context for new store files
|
// Crypto context for new store files
|
||||||
String cipherName = family.getEncryptionType();
|
String cipherName = family.getEncryptionType();
|
||||||
if (cipherName != null) {
|
if (cipherName != null) {
|
||||||
|
@ -551,14 +558,15 @@ public class HStore implements Store {
|
||||||
try {
|
try {
|
||||||
Future<StoreFile> future = completionService.take();
|
Future<StoreFile> future = completionService.take();
|
||||||
StoreFile storeFile = future.get();
|
StoreFile storeFile = future.get();
|
||||||
|
if (storeFile != null) {
|
||||||
long length = storeFile.getReader().length();
|
long length = storeFile.getReader().length();
|
||||||
this.storeSize += length;
|
this.storeSize += length;
|
||||||
this.totalUncompressedBytes +=
|
this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
|
||||||
storeFile.getReader().getTotalUncompressedBytes();
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("loaded " + storeFile.toStringDetailed());
|
LOG.debug("loaded " + storeFile.toStringDetailed());
|
||||||
}
|
}
|
||||||
results.add(storeFile);
|
results.add(storeFile);
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
|
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
|
@ -656,8 +664,7 @@ public class HStore implements Store {
|
||||||
region.getMVCC().advanceTo(this.getMaxSequenceId());
|
region.getMVCC().advanceTo(this.getMaxSequenceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify scanners, close file readers, and recompute store size
|
completeCompaction(toBeRemovedStoreFiles);
|
||||||
completeCompaction(toBeRemovedStoreFiles, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
|
private StoreFile createStoreFileAndReader(final Path p) throws IOException {
|
||||||
|
@ -834,7 +841,6 @@ public class HStore implements Store {
|
||||||
// the lock.
|
// the lock.
|
||||||
this.lock.writeLock().unlock();
|
this.lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
notifyChangedReadersObservers();
|
|
||||||
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
|
LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
String traceMessage = "BULK LOAD time,size,store size,store files ["
|
String traceMessage = "BULK LOAD time,size,store size,store files ["
|
||||||
|
@ -850,7 +856,10 @@ public class HStore implements Store {
|
||||||
try {
|
try {
|
||||||
// Clear so metrics doesn't find them.
|
// Clear so metrics doesn't find them.
|
||||||
ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
|
ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
storeEngine.getStoreFileManager().clearCompactedFiles();
|
||||||
|
// clear the compacted files
|
||||||
|
removeCompactedFiles(compactedfiles);
|
||||||
if (!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
// initialize the thread pool for closing store files in parallel.
|
// initialize the thread pool for closing store files in parallel.
|
||||||
ThreadPoolExecutor storeFileCloserThreadPool = this.region
|
ThreadPoolExecutor storeFileCloserThreadPool = this.region
|
||||||
|
@ -892,6 +901,9 @@ public class HStore implements Store {
|
||||||
}
|
}
|
||||||
if (ioe != null) throw ioe;
|
if (ioe != null) throw ioe;
|
||||||
}
|
}
|
||||||
|
if (compactionCleanerthreadPoolExecutor != null) {
|
||||||
|
compactionCleanerthreadPoolExecutor.shutdownNow();
|
||||||
|
}
|
||||||
LOG.info("Closed " + this);
|
LOG.info("Closed " + this);
|
||||||
return result;
|
return result;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1087,10 +1099,8 @@ public class HStore implements Store {
|
||||||
// the lock.
|
// the lock.
|
||||||
this.lock.writeLock().unlock();
|
this.lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
|
// notify to be called here - only in case of flushes
|
||||||
// Tell listeners of the change in readers.
|
|
||||||
notifyChangedReadersObservers();
|
notifyChangedReadersObservers();
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
long totalSize = 0;
|
long totalSize = 0;
|
||||||
for (StoreFile sf : sfs) {
|
for (StoreFile sf : sfs) {
|
||||||
|
@ -1268,7 +1278,7 @@ public class HStore implements Store {
|
||||||
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||||
}
|
}
|
||||||
// At this point the store will use new files for all new scanners.
|
// At this point the store will use new files for all new scanners.
|
||||||
completeCompaction(filesToCompact, true); // Archive old files & update store size.
|
completeCompaction(filesToCompact); // update store size.
|
||||||
|
|
||||||
logCompactionEndMessage(cr, sfs, compactionStartTime);
|
logCompactionEndMessage(cr, sfs, compactionStartTime);
|
||||||
return sfs;
|
return sfs;
|
||||||
|
@ -1456,7 +1466,7 @@ public class HStore implements Store {
|
||||||
LOG.info("Replaying compaction marker, replacing input files: " +
|
LOG.info("Replaying compaction marker, replacing input files: " +
|
||||||
inputStoreFiles + " with output files : " + outputStoreFiles);
|
inputStoreFiles + " with output files : " + outputStoreFiles);
|
||||||
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
|
this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
|
||||||
this.completeCompaction(inputStoreFiles, removeFiles);
|
this.completeCompaction(inputStoreFiles);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1508,7 +1518,7 @@ public class HStore implements Store {
|
||||||
this.getCoprocessorHost().postCompact(this, sf, null);
|
this.getCoprocessorHost().postCompact(this, sf, null);
|
||||||
}
|
}
|
||||||
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
|
replaceStoreFiles(filesToCompact, Lists.newArrayList(sf));
|
||||||
completeCompaction(filesToCompact, true);
|
completeCompaction(filesToCompact);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (filesCompacting) {
|
synchronized (filesCompacting) {
|
||||||
|
@ -1770,54 +1780,7 @@ public class HStore implements Store {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void completeCompaction(final Collection<StoreFile> compactedFiles)
|
protected void completeCompaction(final Collection<StoreFile> compactedFiles)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
completeCompaction(compactedFiles, true);
|
LOG.debug("Completing compaction...");
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>It works by processing a compaction that's been written to disk.
|
|
||||||
*
|
|
||||||
* <p>It is usually invoked at the end of a compaction, but might also be
|
|
||||||
* invoked at HStore startup, if the prior execution died midway through.
|
|
||||||
*
|
|
||||||
* <p>Moving the compacted TreeMap into place means:
|
|
||||||
* <pre>
|
|
||||||
* 1) Unload all replaced StoreFile, close and collect list to delete.
|
|
||||||
* 2) Compute new store size
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* @param compactedFiles list of files that were compacted
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
protected void completeCompaction(final Collection<StoreFile> compactedFiles, boolean removeFiles)
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
// Do not delete old store files until we have sent out notification of
|
|
||||||
// change in case old files are still being accessed by outstanding scanners.
|
|
||||||
// Don't do this under writeLock; see HBASE-4485 for a possible deadlock
|
|
||||||
// scenario that could have happened if continue to hold the lock.
|
|
||||||
notifyChangedReadersObservers();
|
|
||||||
// At this point the store will use new files for all scanners.
|
|
||||||
|
|
||||||
// let the archive util decide if we should archive or delete the files
|
|
||||||
LOG.debug("Removing store files after compaction...");
|
|
||||||
boolean evictOnClose =
|
|
||||||
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
|
|
||||||
for (StoreFile compactedFile : compactedFiles) {
|
|
||||||
compactedFile.closeReader(evictOnClose);
|
|
||||||
}
|
|
||||||
if (removeFiles) {
|
|
||||||
this.fs.removeStoreFiles(this.getColumnFamilyName(), compactedFiles);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = e instanceof RemoteException ?
|
|
||||||
((RemoteException)e).unwrapRemoteException() : e;
|
|
||||||
LOG.error("Failed removing compacted files in " + this +
|
|
||||||
". Files we were trying to remove are " + compactedFiles.toString() +
|
|
||||||
"; some of them may have been already removed", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Compute new store size
|
|
||||||
this.storeSize = 0L;
|
this.storeSize = 0L;
|
||||||
this.totalUncompressedBytes = 0L;
|
this.totalUncompressedBytes = 0L;
|
||||||
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
||||||
|
@ -2248,7 +2211,7 @@ public class HStore implements Store {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final long FIXED_OVERHEAD =
|
public static final long FIXED_OVERHEAD =
|
||||||
ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
|
ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG)
|
||||||
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||||
|
|
||||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||||
|
@ -2365,4 +2328,112 @@ public class HStore implements Store {
|
||||||
public boolean isPrimaryReplicaStore() {
|
public boolean isPrimaryReplicaStore() {
|
||||||
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
|
return getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closeAndArchiveCompactedFiles() throws IOException {
|
||||||
|
lock.readLock().lock();
|
||||||
|
Collection<StoreFile> copyCompactedfiles = null;
|
||||||
|
try {
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
this.getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
if (compactedfiles != null && compactedfiles.size() != 0) {
|
||||||
|
// Do a copy under read lock
|
||||||
|
copyCompactedfiles = new ArrayList<StoreFile>(compactedfiles);
|
||||||
|
} else {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("No compacted files to archive");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.readLock().unlock();
|
||||||
|
}
|
||||||
|
removeCompactedFiles(copyCompactedfiles);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
|
||||||
|
return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
|
||||||
|
new ThreadFactory() {
|
||||||
|
private int count = 1;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Thread newThread(Runnable r) {
|
||||||
|
return new Thread(r, "CompactedfilesArchiver-" + count++);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeCompactedFiles(Collection<StoreFile> compactedfiles) throws IOException {
|
||||||
|
if (compactedfiles != null && !compactedfiles.isEmpty()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removing the compacted store files " + compactedfiles);
|
||||||
|
}
|
||||||
|
for (final StoreFile file : compactedfiles) {
|
||||||
|
completionService.submit(new Callable<StoreFile>() {
|
||||||
|
@Override
|
||||||
|
public StoreFile call() throws IOException {
|
||||||
|
synchronized (file) {
|
||||||
|
try {
|
||||||
|
StoreFile.Reader r = file.getReader();
|
||||||
|
if (r == null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("The file " + file + " was closed but still not archived.");
|
||||||
|
}
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
|
||||||
|
// Even if deleting fails we need not bother as any new scanners won't be
|
||||||
|
// able to use the compacted file as the status is already compactedAway
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Closing and archiving the file " + file.getPath());
|
||||||
|
}
|
||||||
|
r.close(true);
|
||||||
|
// Just close and return
|
||||||
|
return file;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception while trying to close the compacted store file "
|
||||||
|
+ file.getPath().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
|
||||||
|
try {
|
||||||
|
for (final StoreFile file : compactedfiles) {
|
||||||
|
Future<StoreFile> future = completionService.take();
|
||||||
|
StoreFile closedFile = future.get();
|
||||||
|
if (closedFile != null) {
|
||||||
|
filesToRemove.add(closedFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.error("Interrupted exception while closing the compacted files", ie);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Exception occured while closing the compacted files", e);
|
||||||
|
}
|
||||||
|
if (isPrimaryReplicaStore()) {
|
||||||
|
archiveAndRemoveCompactedFiles(filesToRemove);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void archiveAndRemoveCompactedFiles(List<StoreFile> filesToArchive) throws IOException {
|
||||||
|
if (!filesToArchive.isEmpty()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Moving the files " + filesToArchive + " to archive");
|
||||||
|
}
|
||||||
|
// Only if this is successful it has to be removed
|
||||||
|
this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToArchive);
|
||||||
|
try {
|
||||||
|
lock.writeLock().lock();
|
||||||
|
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToArchive);
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,24 +123,13 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean seekToPreviousRow(Cell key) throws IOException {
|
public boolean seekToPreviousRow(Cell key) throws IOException {
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
checkReseek();
|
checkReseek();
|
||||||
return this.heap.seekToPreviousRow(key);
|
return this.heap.seekToPreviousRow(key);
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean backwardSeek(Cell key) throws IOException {
|
public boolean backwardSeek(Cell key) throws IOException {
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
checkReseek();
|
checkReseek();
|
||||||
return this.heap.backwardSeek(key);
|
return this.heap.backwardSeek(key);
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -466,4 +466,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
||||||
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
|
void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException;
|
||||||
|
|
||||||
boolean isPrimaryReplicaStore();
|
boolean isPrimaryReplicaStore();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes and archives the compacted files under this store
|
||||||
|
*/
|
||||||
|
void closeAndArchiveCompactedFiles() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,8 @@ import java.util.Map;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -61,6 +63,7 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
@ -378,6 +381,19 @@ public class StoreFile {
|
||||||
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
|
return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean isCompactedAway() {
|
||||||
|
if (this.reader != null) {
|
||||||
|
return this.reader.isCompactedAway();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getRefCount() {
|
||||||
|
return this.reader.refCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the timestamp at which this bulk load file was generated.
|
* Return the timestamp at which this bulk load file was generated.
|
||||||
*/
|
*/
|
||||||
|
@ -552,6 +568,15 @@ public class StoreFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the status of the file as compactedAway.
|
||||||
|
*/
|
||||||
|
public void markCompactedAway() {
|
||||||
|
if (this.reader != null) {
|
||||||
|
this.reader.markCompactedAway();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete this file
|
* Delete this file
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -1137,6 +1162,12 @@ public class StoreFile {
|
||||||
private boolean bulkLoadResult = false;
|
private boolean bulkLoadResult = false;
|
||||||
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
|
private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
|
||||||
private boolean skipResetSeqId = true;
|
private boolean skipResetSeqId = true;
|
||||||
|
// Counter that is incremented every time a scanner is created on the
|
||||||
|
// store file. It is decremented when the scan on the store file is
|
||||||
|
// done.
|
||||||
|
private AtomicInteger refCount = new AtomicInteger(0);
|
||||||
|
// Indicates if the file got compacted
|
||||||
|
private volatile boolean compactedAway = false;
|
||||||
|
|
||||||
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
|
public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -1144,6 +1175,10 @@ public class StoreFile {
|
||||||
bloomFilterType = BloomType.NONE;
|
bloomFilterType = BloomType.NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void markCompactedAway() {
|
||||||
|
this.compactedAway = true;
|
||||||
|
}
|
||||||
|
|
||||||
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
|
public Reader(FileSystem fs, Path path, FSDataInputStreamWrapper in, long size,
|
||||||
CacheConfig cacheConf, Configuration conf) throws IOException {
|
CacheConfig cacheConf, Configuration conf) throws IOException {
|
||||||
reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
|
reader = HFile.createReader(fs, path, in, size, cacheConf, conf);
|
||||||
|
@ -1195,11 +1230,35 @@ public class StoreFile {
|
||||||
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
|
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
|
||||||
boolean pread,
|
boolean pread,
|
||||||
boolean isCompaction, long readPt) {
|
boolean isCompaction, long readPt) {
|
||||||
|
// Increment the ref count
|
||||||
|
refCount.incrementAndGet();
|
||||||
return new StoreFileScanner(this,
|
return new StoreFileScanner(this,
|
||||||
getScanner(cacheBlocks, pread, isCompaction),
|
getScanner(cacheBlocks, pread, isCompaction),
|
||||||
!isCompaction, reader.hasMVCCInfo(), readPt);
|
!isCompaction, reader.hasMVCCInfo(), readPt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrement the ref count associated with the reader when ever a scanner associated
|
||||||
|
* with the reader is closed
|
||||||
|
*/
|
||||||
|
void decrementRefCount() {
|
||||||
|
refCount.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the file is still used in reads
|
||||||
|
*/
|
||||||
|
public boolean isReferencedInReads() {
|
||||||
|
return refCount.get() != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if the file is compacted
|
||||||
|
*/
|
||||||
|
public boolean isCompactedAway() {
|
||||||
|
return this.compactedAway;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Warning: Do not write further code which depends on this call. Instead
|
* Warning: Do not write further code which depends on this call. Instead
|
||||||
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
|
||||||
|
@ -1710,7 +1769,13 @@ public class StoreFile {
|
||||||
private static class GetFileSize implements Function<StoreFile, Long> {
|
private static class GetFileSize implements Function<StoreFile, Long> {
|
||||||
@Override
|
@Override
|
||||||
public Long apply(StoreFile sf) {
|
public Long apply(StoreFile sf) {
|
||||||
|
if (sf.getReader() != null) {
|
||||||
return sf.getReader().length();
|
return sf.getReader().length();
|
||||||
|
} else {
|
||||||
|
// the reader may be null for the compacted files and if the archiving
|
||||||
|
// had failed.
|
||||||
|
return -1L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,19 +53,33 @@ public interface StoreFileManager {
|
||||||
void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
|
void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds compaction results into the structure.
|
* Adds only the new compaction results into the structure.
|
||||||
* @param compactedFiles The input files for the compaction.
|
* @param compactedFiles The input files for the compaction.
|
||||||
* @param results The resulting files for the compaction.
|
* @param results The resulting files for the compaction.
|
||||||
*/
|
*/
|
||||||
void addCompactionResults(
|
void addCompactionResults(
|
||||||
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
|
Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the compacted files
|
||||||
|
* @param compactedFiles the list of compacted files
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clears all the files currently in use and returns them.
|
* Clears all the files currently in use and returns them.
|
||||||
* @return The files previously in use.
|
* @return The files previously in use.
|
||||||
*/
|
*/
|
||||||
ImmutableCollection<StoreFile> clearFiles();
|
ImmutableCollection<StoreFile> clearFiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears all the compacted files and returns them. This method is expected to be
|
||||||
|
* accessed single threaded.
|
||||||
|
* @return The files compacted previously.
|
||||||
|
*/
|
||||||
|
Collection<StoreFile> clearCompactedFiles();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the snapshot of the store files currently in use. Can be used for things like metrics
|
* Gets the snapshot of the store files currently in use. Can be used for things like metrics
|
||||||
* and checks; should not assume anything about relations between store files in the list.
|
* and checks; should not assume anything about relations between store files in the list.
|
||||||
|
@ -73,6 +87,15 @@ public interface StoreFileManager {
|
||||||
*/
|
*/
|
||||||
Collection<StoreFile> getStorefiles();
|
Collection<StoreFile> getStorefiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List of compacted files inside this store that needs to be excluded in reads
|
||||||
|
* because further new reads will be using only the newly created files out of compaction.
|
||||||
|
* These compacted files will be deleted/cleared once all the existing readers on these
|
||||||
|
* compacted files are done.
|
||||||
|
* @return the list of compacted files
|
||||||
|
*/
|
||||||
|
Collection<StoreFile> getCompactedfiles();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of files currently in use.
|
* Returns the number of files currently in use.
|
||||||
* @return The number of files.
|
* @return The number of files.
|
||||||
|
|
|
@ -248,6 +248,9 @@ public class StoreFileScanner implements KeyValueScanner {
|
||||||
public void close() {
|
public void close() {
|
||||||
cur = null;
|
cur = null;
|
||||||
this.hfs.close();
|
this.hfs.close();
|
||||||
|
if (this.reader != null) {
|
||||||
|
this.reader.decrementRefCount();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -125,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
// A flag whether use pread for scan
|
// A flag whether use pread for scan
|
||||||
private boolean scanUsePread = false;
|
private boolean scanUsePread = false;
|
||||||
protected ReentrantLock lock = new ReentrantLock();
|
// Indicates whether there was flush during the course of the scan
|
||||||
|
protected volatile boolean flushed = false;
|
||||||
|
|
||||||
protected final long readPt;
|
protected final long readPt;
|
||||||
|
|
||||||
|
@ -403,15 +403,11 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Cell peek() {
|
public Cell peek() {
|
||||||
lock.lock();
|
checkResetHeap();
|
||||||
try {
|
|
||||||
if (this.heap == null) {
|
if (this.heap == null) {
|
||||||
return this.lastTop;
|
return this.lastTop;
|
||||||
}
|
}
|
||||||
return this.heap.peek();
|
return this.heap.peek();
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -426,8 +422,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
private void close(boolean withHeapClose) {
|
private void close(boolean withHeapClose) {
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
if (this.closing) {
|
if (this.closing) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -450,21 +444,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.lastTop = null; // If both are null, we are closed.
|
this.lastTop = null; // If both are null, we are closed.
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean seek(Cell key) throws IOException {
|
public boolean seek(Cell key) throws IOException {
|
||||||
lock.lock();
|
checkResetHeap();
|
||||||
try {
|
|
||||||
// reset matcher state, in case that underlying store changed
|
// reset matcher state, in case that underlying store changed
|
||||||
checkReseek();
|
checkReseek();
|
||||||
return this.heap.seek(key);
|
return this.heap.seek(key);
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -480,11 +467,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
|
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
if (scannerContext == null) {
|
if (scannerContext == null) {
|
||||||
throw new IllegalArgumentException("Scanner context cannot be null");
|
throw new IllegalArgumentException("Scanner context cannot be null");
|
||||||
}
|
}
|
||||||
|
checkResetHeap();
|
||||||
if (checkReseek()) {
|
if (checkReseek()) {
|
||||||
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
|
||||||
}
|
}
|
||||||
|
@ -509,8 +495,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
|
// If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing
|
||||||
// rows. Else it is possible we are still traversing the same row so we must perform the row
|
// rows. Else it is possible we are still traversing the same row so we must perform the row
|
||||||
// comparison.
|
// comparison.
|
||||||
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null ||
|
if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null
|
||||||
!CellUtil.matchingRow(cell, matcher.curCell)) {
|
|| !CellUtil.matchingRow(cell, matcher.curCell)) {
|
||||||
this.countPerRow = 0;
|
this.countPerRow = 0;
|
||||||
matcher.setToNewRow(cell);
|
matcher.setToNewRow(cell);
|
||||||
}
|
}
|
||||||
|
@ -550,8 +536,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
|
|
||||||
this.countPerRow++;
|
this.countPerRow++;
|
||||||
if (storeLimit > -1 &&
|
if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) {
|
||||||
this.countPerRow > (storeLimit + storeOffset)) {
|
|
||||||
// do what SEEK_NEXT_ROW does.
|
// do what SEEK_NEXT_ROW does.
|
||||||
if (!matcher.moreRowsMayExistAfter(cell)) {
|
if (!matcher.moreRowsMayExistAfter(cell)) {
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
|
@ -574,8 +559,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
scannerContext.incrementBatchProgress(1);
|
scannerContext.incrementBatchProgress(1);
|
||||||
|
|
||||||
if (totalBytesRead > maxRowSize) {
|
if (totalBytesRead > maxRowSize) {
|
||||||
throw new RowTooBigException("Max row size allowed: " + maxRowSize
|
throw new RowTooBigException(
|
||||||
+ ", but the row is bigger than that.");
|
"Max row size allowed: " + maxRowSize + ", but the row is bigger than that.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -644,9 +629,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// No more keys
|
// No more keys
|
||||||
close(false);// Do all cleanup except heap.close()
|
close(false);// Do all cleanup except heap.close()
|
||||||
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -684,30 +666,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
// Implementation of ChangedReadersObserver
|
// Implementation of ChangedReadersObserver
|
||||||
@Override
|
@Override
|
||||||
public void updateReaders() throws IOException {
|
public void updateReaders() throws IOException {
|
||||||
lock.lock();
|
flushed = true;
|
||||||
try {
|
// Let the next() call handle re-creating and seeking
|
||||||
if (this.closing) return;
|
}
|
||||||
|
|
||||||
|
protected void nullifyCurrentHeap() {
|
||||||
|
if (this.closing) return;
|
||||||
// All public synchronized API calls will call 'checkReseek' which will cause
|
// All public synchronized API calls will call 'checkReseek' which will cause
|
||||||
// the scanner stack to reseek if this.heap==null && this.lastTop != null.
|
// the scanner stack to reseek if this.heap==null && this.lastTop != null.
|
||||||
// But if two calls to updateReaders() happen without a 'next' or 'peek' then we
|
// But if two calls to updateReaders() happen without a 'next' or 'peek' then we
|
||||||
// will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
|
// will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
|
||||||
// which is NOT what we want, not to mention could cause an NPE. So we early out here.
|
// which is NOT what we want, not to mention could cause an NPE. So we early out here.
|
||||||
if (this.heap == null) return;
|
if (this.heap == null) return;
|
||||||
|
|
||||||
// this could be null.
|
// this could be null.
|
||||||
this.lastTop = this.peek();
|
this.lastTop = this.heap.peek();
|
||||||
|
|
||||||
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
|
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
|
||||||
|
|
||||||
// close scanners to old obsolete Store files
|
// close scanners to old obsolete Store files
|
||||||
this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
|
this.heapsForDelayedClose.add(this.heap);// Don't close now. Delay it till StoreScanner#close
|
||||||
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
|
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
|
||||||
|
|
||||||
// Let the next() call handle re-creating and seeking
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -793,8 +771,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean reseek(Cell kv) throws IOException {
|
public boolean reseek(Cell kv) throws IOException {
|
||||||
lock.lock();
|
checkResetHeap();
|
||||||
try {
|
|
||||||
// Heap will not be null, if this is called from next() which.
|
// Heap will not be null, if this is called from next() which.
|
||||||
// If called from RegionScanner.reseek(...) make sure the scanner
|
// If called from RegionScanner.reseek(...) make sure the scanner
|
||||||
// stack is reset if needed.
|
// stack is reset if needed.
|
||||||
|
@ -803,8 +780,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
return heap.requestSeek(kv, true, useRowColBloom);
|
return heap.requestSeek(kv, true, useRowColBloom);
|
||||||
}
|
}
|
||||||
return heap.reseek(kv);
|
return heap.reseek(kv);
|
||||||
} finally {
|
}
|
||||||
lock.unlock();
|
|
||||||
|
protected void checkResetHeap() {
|
||||||
|
// check the var without any lock. Suppose even if we see the old
|
||||||
|
// value here still it is ok to continue because we will not be resetting
|
||||||
|
// the heap but will continue with the referenced memstore's snapshot. For compactions
|
||||||
|
// any way we don't need the updateReaders at all to happen as we still continue with
|
||||||
|
// the older files
|
||||||
|
if (flushed) {
|
||||||
|
// If the 'flushed' is found to be true then there is a need to ensure
|
||||||
|
// that the current scanner updates the heap that it has and then proceed
|
||||||
|
// with the scan and ensure to reset the flushed inside the lock
|
||||||
|
// One thing can be sure that the same store scanner cannot be in reseek and
|
||||||
|
// next at the same time ie. within the same store scanner it is always single
|
||||||
|
// threaded
|
||||||
|
nullifyCurrentHeap();
|
||||||
|
// reset the flag
|
||||||
|
flushed = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -883,8 +876,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shipped() throws IOException {
|
public void shipped() throws IOException {
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
for (KeyValueHeap h : this.heapsForDelayedClose) {
|
for (KeyValueHeap h : this.heapsForDelayedClose) {
|
||||||
h.close();// There wont be further fetch of Cells from these scanners. Just close.
|
h.close();// There wont be further fetch of Cells from these scanners. Just close.
|
||||||
}
|
}
|
||||||
|
@ -892,9 +883,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
if (this.heap != null) {
|
if (this.heap != null) {
|
||||||
this.heap.shipped();
|
this.heap.shipped();
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,6 +106,7 @@ public class StripeStoreFileManager
|
||||||
|
|
||||||
/** Cached list of all files in the structure, to return from some calls */
|
/** Cached list of all files in the structure, to return from some calls */
|
||||||
public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
|
public ImmutableList<StoreFile> allFilesCached = ImmutableList.<StoreFile>of();
|
||||||
|
private ImmutableList<StoreFile> allCompactedFilesCached = ImmutableList.<StoreFile>of();
|
||||||
}
|
}
|
||||||
private State state = null;
|
private State state = null;
|
||||||
|
|
||||||
|
@ -140,9 +141,15 @@ public class StripeStoreFileManager
|
||||||
return state.allFilesCached;
|
return state.allFilesCached;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<StoreFile> getCompactedfiles() {
|
||||||
|
return state.allCompactedFilesCached;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
|
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
|
||||||
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
|
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
|
||||||
|
// Passing null does not cause NPE??
|
||||||
cmc.mergeResults(null, sfs);
|
cmc.mergeResults(null, sfs);
|
||||||
debugDumpState("Added new files");
|
debugDumpState("Added new files");
|
||||||
}
|
}
|
||||||
|
@ -156,6 +163,13 @@ public class StripeStoreFileManager
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ImmutableCollection<StoreFile> clearCompactedFiles() {
|
||||||
|
ImmutableCollection<StoreFile> result = state.allCompactedFilesCached;
|
||||||
|
this.state = new State();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getStorefileCount() {
|
public int getStorefileCount() {
|
||||||
return state.allFilesCached.size();
|
return state.allFilesCached.size();
|
||||||
|
@ -306,9 +320,31 @@ public class StripeStoreFileManager
|
||||||
// copies and apply the result at the end.
|
// copies and apply the result at the end.
|
||||||
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
|
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
|
||||||
cmc.mergeResults(compactedFiles, results);
|
cmc.mergeResults(compactedFiles, results);
|
||||||
|
markCompactedAway(compactedFiles);
|
||||||
debugDumpState("Merged compaction results");
|
debugDumpState("Merged compaction results");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalised
|
||||||
|
// Let a background thread close the actual reader on these compacted files and also
|
||||||
|
// ensure to evict the blocks from block cache so that they are no longer in
|
||||||
|
// cache
|
||||||
|
private void markCompactedAway(Collection<StoreFile> compactedFiles) {
|
||||||
|
for (StoreFile file : compactedFiles) {
|
||||||
|
file.markCompactedAway();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException {
|
||||||
|
// See class comment for the assumptions we make here.
|
||||||
|
LOG.debug("Attempting to delete compaction results: " + compactedFiles.size());
|
||||||
|
// In order to be able to fail in the middle of the operation, we'll operate on lazy
|
||||||
|
// copies and apply the result at the end.
|
||||||
|
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
|
||||||
|
cmc.deleteResults(compactedFiles);
|
||||||
|
debugDumpState("Deleted compaction results");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getStoreCompactionPriority() {
|
public int getStoreCompactionPriority() {
|
||||||
// If there's only L0, do what the default store does.
|
// If there's only L0, do what the default store does.
|
||||||
|
@ -684,7 +720,7 @@ public class StripeStoreFileManager
|
||||||
this.isFlush = isFlush;
|
this.isFlush = isFlush;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
|
private void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
assert this.compactedFiles == null && this.results == null;
|
assert this.compactedFiles == null && this.results == null;
|
||||||
this.compactedFiles = compactedFiles;
|
this.compactedFiles = compactedFiles;
|
||||||
|
@ -696,12 +732,20 @@ public class StripeStoreFileManager
|
||||||
processNewCandidateStripes(newStripes);
|
processNewCandidateStripes(newStripes);
|
||||||
}
|
}
|
||||||
// Create new state and update parent.
|
// Create new state and update parent.
|
||||||
State state = createNewState();
|
State state = createNewState(false);
|
||||||
StripeStoreFileManager.this.state = state;
|
StripeStoreFileManager.this.state = state;
|
||||||
updateMetadataMaps();
|
updateMetadataMaps();
|
||||||
}
|
}
|
||||||
|
|
||||||
private State createNewState() {
|
private void deleteResults(Collection<StoreFile> compactedFiles) throws IOException {
|
||||||
|
this.compactedFiles = compactedFiles;
|
||||||
|
// Create new state and update parent.
|
||||||
|
State state = createNewState(true);
|
||||||
|
StripeStoreFileManager.this.state = state;
|
||||||
|
updateMetadataMaps();
|
||||||
|
}
|
||||||
|
|
||||||
|
private State createNewState(boolean delCompactedFiles) {
|
||||||
State oldState = StripeStoreFileManager.this.state;
|
State oldState = StripeStoreFileManager.this.state;
|
||||||
// Stripe count should be the same unless the end rows changed.
|
// Stripe count should be the same unless the end rows changed.
|
||||||
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
|
assert oldState.stripeFiles.size() == this.stripeFiles.size() || this.stripeEndRows != null;
|
||||||
|
@ -717,9 +761,21 @@ public class StripeStoreFileManager
|
||||||
}
|
}
|
||||||
|
|
||||||
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
|
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
|
||||||
if (!isFlush) newAllFiles.removeAll(compactedFiles);
|
List<StoreFile> newAllCompactedFiles =
|
||||||
|
new ArrayList<StoreFile>(oldState.allCompactedFilesCached);
|
||||||
|
if (!isFlush) {
|
||||||
|
newAllFiles.removeAll(compactedFiles);
|
||||||
|
if (delCompactedFiles) {
|
||||||
|
newAllCompactedFiles.removeAll(compactedFiles);
|
||||||
|
} else {
|
||||||
|
newAllCompactedFiles.addAll(compactedFiles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (results != null) {
|
||||||
newAllFiles.addAll(results);
|
newAllFiles.addAll(results);
|
||||||
|
}
|
||||||
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
|
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
|
||||||
|
newState.allCompactedFilesCached = ImmutableList.copyOf(newAllCompactedFiles);
|
||||||
return newState;
|
return newState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -970,16 +1026,18 @@ public class StripeStoreFileManager
|
||||||
// Order by seqnum is reversed.
|
// Order by seqnum is reversed.
|
||||||
for (int i = 1; i < stripe.size(); ++i) {
|
for (int i = 1; i < stripe.size(); ++i) {
|
||||||
StoreFile sf = stripe.get(i);
|
StoreFile sf = stripe.get(i);
|
||||||
|
synchronized (sf) {
|
||||||
long fileTs = sf.getReader().getMaxTimestamp();
|
long fileTs = sf.getReader().getMaxTimestamp();
|
||||||
if (fileTs < maxTs && !filesCompacting.contains(sf)) {
|
if (fileTs < maxTs && !filesCompacting.contains(sf)) {
|
||||||
LOG.info("Found an expired store file: " + sf.getPath()
|
LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is "
|
||||||
+ " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs);
|
+ fileTs + ", which is below " + maxTs);
|
||||||
if (expiredStoreFiles == null) {
|
if (expiredStoreFiles == null) {
|
||||||
expiredStoreFiles = new ArrayList<StoreFile>();
|
expiredStoreFiles = new ArrayList<StoreFile>();
|
||||||
}
|
}
|
||||||
expiredStoreFiles.add(sf);
|
expiredStoreFiles.add(sf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return expiredStoreFiles;
|
return expiredStoreFiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A chore service that periodically cleans up the compacted files when there are no active readers
|
||||||
|
* using those compacted files and also helps in clearing the block cache with these compacted
|
||||||
|
* file entries
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class CompactedHFilesDischarger extends ScheduledChore {
|
||||||
|
private static final Log LOG = LogFactory.getLog(CompactedHFilesDischarger.class);
|
||||||
|
private Region region;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param period the period of time to sleep between each run
|
||||||
|
* @param stopper the stopper
|
||||||
|
* @param region the store to identify the family name
|
||||||
|
*/
|
||||||
|
public CompactedHFilesDischarger(final int period, final Stoppable stopper, final Region region) {
|
||||||
|
// Need to add the config classes
|
||||||
|
super("CompactedHFilesCleaner", stopper, period);
|
||||||
|
this.region = region;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void chore() {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(
|
||||||
|
"Started the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
|
||||||
|
}
|
||||||
|
for (Store store : region.getStores()) {
|
||||||
|
try {
|
||||||
|
store.closeAndArchiveCompactedFiles();
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Completed archiving the compacted files for the region "
|
||||||
|
+ this.region.getRegionInfo() + " under the store " + store.getColumnFamilyName());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error(
|
||||||
|
"Exception while trying to close and archive the comapcted store files of the store "
|
||||||
|
+ store.getColumnFamilyName() + " in the region " + this.region.getRegionInfo(),
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace(
|
||||||
|
"Completed the compacted hfiles cleaner for the region " + this.region.getRegionInfo());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -197,17 +197,6 @@ public class TestIOFencing {
|
||||||
r = (CompactionBlockerRegion) region;
|
r = (CompactionBlockerRegion) region;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void completeCompaction(final Collection<StoreFile> compactedFiles,
|
|
||||||
boolean removeFiles) throws IOException {
|
|
||||||
try {
|
|
||||||
r.compactionsWaiting.countDown();
|
|
||||||
r.compactionsBlocked.await();
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
throw new IOException(ex);
|
|
||||||
}
|
|
||||||
super.completeCompaction(compactedFiles, removeFiles);
|
|
||||||
}
|
|
||||||
@Override
|
@Override
|
||||||
protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
|
protected void completeCompaction(Collection<StoreFile> compactedFiles) throws IOException {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
|
||||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -170,10 +172,11 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
|
|
||||||
// create the region
|
// create the region
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
||||||
Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
||||||
|
final CompactedHFilesDischarger compactionCleaner =
|
||||||
|
new CompactedHFilesDischarger(100, stop, region);
|
||||||
loadFlushAndCompact(region, TEST_FAM);
|
loadFlushAndCompact(region, TEST_FAM);
|
||||||
|
compactionCleaner.chore();
|
||||||
// get the current hfiles in the archive directory
|
// get the current hfiles in the archive directory
|
||||||
List<Path> files = getAllFiles(fs, archiveDir);
|
List<Path> files = getAllFiles(fs, archiveDir);
|
||||||
if (files == null) {
|
if (files == null) {
|
||||||
|
@ -217,18 +220,22 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop);
|
||||||
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
List<BaseHFileCleanerDelegate> cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner);
|
||||||
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0);
|
||||||
|
|
||||||
// create the region
|
// create the region
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
HColumnDescriptor hcd = new HColumnDescriptor(TEST_FAM);
|
||||||
Region region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
HRegion region = UTIL.createTestRegion(STRING_TABLE_NAME, hcd);
|
||||||
|
final CompactedHFilesDischarger compactionCleaner =
|
||||||
|
new CompactedHFilesDischarger(100, stop, region);
|
||||||
loadFlushAndCompact(region, TEST_FAM);
|
loadFlushAndCompact(region, TEST_FAM);
|
||||||
|
compactionCleaner.chore();
|
||||||
// create the another table that we don't archive
|
// create the another table that we don't archive
|
||||||
hcd = new HColumnDescriptor(TEST_FAM);
|
hcd = new HColumnDescriptor(TEST_FAM);
|
||||||
Region otherRegion = UTIL.createTestRegion(otherTable, hcd);
|
HRegion otherRegion = UTIL.createTestRegion(otherTable, hcd);
|
||||||
|
final CompactedHFilesDischarger compactionCleaner1 =
|
||||||
|
new CompactedHFilesDischarger(100, stop, otherRegion);
|
||||||
loadFlushAndCompact(otherRegion, TEST_FAM);
|
loadFlushAndCompact(otherRegion, TEST_FAM);
|
||||||
|
compactionCleaner1.chore();
|
||||||
// get the current hfiles in the archive directory
|
// get the current hfiles in the archive directory
|
||||||
|
// Should be archived
|
||||||
List<Path> files = getAllFiles(fs, archiveDir);
|
List<Path> files = getAllFiles(fs, archiveDir);
|
||||||
if (files == null) {
|
if (files == null) {
|
||||||
FSUtils.logFileSystemState(fs, archiveDir, LOG);
|
FSUtils.logFileSystemState(fs, archiveDir, LOG);
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -74,6 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -95,6 +97,8 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestRule;
|
import org.junit.rules.TestRule;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
|
* Simple test for {@link CellSortReducer} and {@link HFileOutputFormat2}.
|
||||||
* Sets up and runs a mapreduce job that writes hfile output.
|
* Sets up and runs a mapreduce job that writes hfile output.
|
||||||
|
@ -1002,6 +1006,12 @@ public class TestHFileOutputFormat2 {
|
||||||
quickPoll(new Callable<Boolean>() {
|
quickPoll(new Callable<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean call() throws Exception {
|
public Boolean call() throws Exception {
|
||||||
|
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
for (Store store : region.getStores()) {
|
||||||
|
store.closeAndArchiveCompactedFiles();
|
||||||
|
}
|
||||||
|
}
|
||||||
return fs.listStatus(storePath).length == 1;
|
return fs.listStatus(storePath).length == 1;
|
||||||
}
|
}
|
||||||
}, 5000);
|
}, 5000);
|
||||||
|
@ -1015,6 +1025,12 @@ public class TestHFileOutputFormat2 {
|
||||||
quickPoll(new Callable<Boolean>() {
|
quickPoll(new Callable<Boolean>() {
|
||||||
@Override
|
@Override
|
||||||
public Boolean call() throws Exception {
|
public Boolean call() throws Exception {
|
||||||
|
List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAME);
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
for (Store store : region.getStores()) {
|
||||||
|
store.closeAndArchiveCompactedFiles();
|
||||||
|
}
|
||||||
|
}
|
||||||
return fs.listStatus(storePath).length == 1;
|
return fs.listStatus(storePath).length == 1;
|
||||||
}
|
}
|
||||||
}, 5000);
|
}, 5000);
|
||||||
|
|
|
@ -48,6 +48,9 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneReq
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
|
||||||
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
|
||||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||||
|
@ -121,6 +124,7 @@ public class TestSnapshotFromMaster {
|
||||||
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
|
conf.setLong(SnapshotHFileCleaner.HFILE_CACHE_REFRESH_PERIOD_CONF_KEY, cacheRefreshPeriod);
|
||||||
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||||
ConstantSizeRegionSplitPolicy.class.getName());
|
ConstantSizeRegionSplitPolicy.class.getName());
|
||||||
|
conf.setInt("hbase.hfile.compactions.cleaner.interval", 20 * 1000);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,6 +324,10 @@ public class TestSnapshotFromMaster {
|
||||||
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
|
region.waitForFlushesAndCompactions(); // enable can trigger a compaction, wait for it.
|
||||||
region.compactStores(); // min is 2 so will compact and archive
|
region.compactStores(); // min is 2 so will compact and archive
|
||||||
}
|
}
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
|
||||||
|
cleaner.chore();
|
||||||
|
}
|
||||||
LOG.info("After compaction File-System state");
|
LOG.info("After compaction File-System state");
|
||||||
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
FSUtils.logFileSystemState(fs, rootDir, LOG);
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,11 @@ public class MockStoreFile extends StoreFile {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCompactedAway() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getMetadataValue(byte[] key) {
|
public byte[] getMetadataValue(byte[] key) {
|
||||||
return this.metadata.get(key);
|
return this.metadata.get(key);
|
||||||
|
|
|
@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.security.Key;
|
import java.security.Key;
|
||||||
import java.security.SecureRandom;
|
import java.security.SecureRandom;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.crypto.spec.SecretKeySpec;
|
import javax.crypto.spec.SecretKeySpec;
|
||||||
|
@ -123,13 +125,14 @@ public class TestEncryptionKeyRotation {
|
||||||
|
|
||||||
// And major compact
|
// And major compact
|
||||||
TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
|
TEST_UTIL.getHBaseAdmin().majorCompact(htd.getTableName());
|
||||||
|
final List<Path> updatePaths = findCompactedStorefilePaths(htd.getTableName());
|
||||||
TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
|
TEST_UTIL.waitFor(30000, 1000, true, new Predicate<Exception>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
// When compaction has finished, all of the original files will be
|
// When compaction has finished, all of the original files will be
|
||||||
// gone
|
// gone
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (Path path: initialPaths) {
|
for (Path path: updatePaths) {
|
||||||
found = TEST_UTIL.getTestFileSystem().exists(path);
|
found = TEST_UTIL.getTestFileSystem().exists(path);
|
||||||
if (found) {
|
if (found) {
|
||||||
LOG.info("Found " + path);
|
LOG.info("Found " + path);
|
||||||
|
@ -141,14 +144,20 @@ public class TestEncryptionKeyRotation {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Verify we have store file(s) with only the new key
|
// Verify we have store file(s) with only the new key
|
||||||
|
Thread.sleep(1000);
|
||||||
|
waitForCompaction(htd.getTableName());
|
||||||
List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
|
List<Path> pathsAfterCompaction = findStorefilePaths(htd.getTableName());
|
||||||
assertTrue(pathsAfterCompaction.size() > 0);
|
assertTrue(pathsAfterCompaction.size() > 0);
|
||||||
for (Path path: pathsAfterCompaction) {
|
for (Path path: pathsAfterCompaction) {
|
||||||
assertFalse("Store file " + path + " retains initial key",
|
|
||||||
Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
|
|
||||||
assertTrue("Store file " + path + " has incorrect key",
|
assertTrue("Store file " + path + " has incorrect key",
|
||||||
Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
|
Bytes.equals(secondCFKey.getEncoded(), extractHFileKey(path)));
|
||||||
}
|
}
|
||||||
|
List<Path> compactedPaths = findCompactedStorefilePaths(htd.getTableName());
|
||||||
|
assertTrue(compactedPaths.size() > 0);
|
||||||
|
for (Path path: compactedPaths) {
|
||||||
|
assertTrue("Store file " + path + " retains initial key",
|
||||||
|
Bytes.equals(initialCFKey.getEncoded(), extractHFileKey(path)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -194,6 +203,33 @@ public class TestEncryptionKeyRotation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void waitForCompaction(TableName tableName)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
boolean compacted = false;
|
||||||
|
for (Region region : TEST_UTIL.getRSForFirstRegionInTable(tableName)
|
||||||
|
.getOnlineRegions(tableName)) {
|
||||||
|
for (Store store : region.getStores()) {
|
||||||
|
compacted = false;
|
||||||
|
while (!compacted) {
|
||||||
|
if (store.getStorefiles() != null) {
|
||||||
|
while (store.getStorefilesCount() != 1) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
for (StoreFile storefile : store.getStorefiles()) {
|
||||||
|
if (!storefile.isCompactedAway()) {
|
||||||
|
compacted = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static List<Path> findStorefilePaths(TableName tableName) throws Exception {
|
private static List<Path> findStorefilePaths(TableName tableName) throws Exception {
|
||||||
List<Path> paths = new ArrayList<Path>();
|
List<Path> paths = new ArrayList<Path>();
|
||||||
for (Region region:
|
for (Region region:
|
||||||
|
@ -207,6 +243,23 @@ public class TestEncryptionKeyRotation {
|
||||||
return paths;
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<Path> findCompactedStorefilePaths(TableName tableName) throws Exception {
|
||||||
|
List<Path> paths = new ArrayList<Path>();
|
||||||
|
for (Region region:
|
||||||
|
TEST_UTIL.getRSForFirstRegionInTable(tableName).getOnlineRegions(tableName)) {
|
||||||
|
for (Store store : region.getStores()) {
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
if (compactedfiles != null) {
|
||||||
|
for (StoreFile storefile : compactedfiles) {
|
||||||
|
paths.add(storefile.getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
private void createTableAndFlush(HTableDescriptor htd) throws Exception {
|
private void createTableAndFlush(HTableDescriptor htd) throws Exception {
|
||||||
HColumnDescriptor hcd = htd.getFamilies().iterator().next();
|
HColumnDescriptor hcd = htd.getFamilies().iterator().next();
|
||||||
// Create the test table
|
// Create the test table
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescripto
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
|
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
|
import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
@ -1369,6 +1370,8 @@ public class TestHRegionReplayEvents {
|
||||||
|
|
||||||
// Test case 3: compact primary files
|
// Test case 3: compact primary files
|
||||||
primaryRegion.compactStores();
|
primaryRegion.compactStores();
|
||||||
|
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion);
|
||||||
|
cleaner.chore();
|
||||||
secondaryRegion.refreshStoreFiles();
|
secondaryRegion.refreshStoreFiles();
|
||||||
assertPathListsEqual(primaryRegion.getStoreFileList(families),
|
assertPathListsEqual(primaryRegion.getStoreFileList(families),
|
||||||
secondaryRegion.getStoreFileList(families));
|
secondaryRegion.getStoreFileList(families));
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -212,7 +214,7 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
|
List<Pair<HRegionInfo, ServerName>> tableRegions = MetaTableAccessor
|
||||||
.getTableRegionsAndLocations(master.getConnection(), tableName);
|
.getTableRegionsAndLocations(master.getConnection(), tableName);
|
||||||
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
|
HRegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
|
||||||
HTableDescriptor tableDescritor = master.getTableDescriptors().get(
|
HTableDescriptor tableDescriptor = master.getTableDescriptors().get(
|
||||||
tableName);
|
tableName);
|
||||||
Result mergedRegionResult = MetaTableAccessor.getRegionResult(
|
Result mergedRegionResult = MetaTableAccessor.getRegionResult(
|
||||||
master.getConnection(), mergedRegionInfo.getRegionName());
|
master.getConnection(), mergedRegionInfo.getRegionName());
|
||||||
|
@ -236,19 +238,34 @@ public class TestRegionMergeTransactionOnCluster {
|
||||||
assertTrue(fs.exists(regionAdir));
|
assertTrue(fs.exists(regionAdir));
|
||||||
assertTrue(fs.exists(regionBdir));
|
assertTrue(fs.exists(regionBdir));
|
||||||
|
|
||||||
|
HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
|
||||||
|
HRegionFileSystem hrfs = new HRegionFileSystem(
|
||||||
|
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
|
||||||
|
int count = 0;
|
||||||
|
for(HColumnDescriptor colFamily : columnFamilies) {
|
||||||
|
count += hrfs.getStoreFiles(colFamily.getName()).size();
|
||||||
|
}
|
||||||
admin.compactRegion(mergedRegionInfo.getRegionName());
|
admin.compactRegion(mergedRegionInfo.getRegionName());
|
||||||
// wait until merged region doesn't have reference file
|
// wait until merged region doesn't have reference file
|
||||||
long timeout = System.currentTimeMillis() + waitTime;
|
long timeout = System.currentTimeMillis() + waitTime;
|
||||||
HRegionFileSystem hrfs = new HRegionFileSystem(
|
|
||||||
TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
|
|
||||||
while (System.currentTimeMillis() < timeout) {
|
while (System.currentTimeMillis() < timeout) {
|
||||||
if (!hrfs.hasReferences(tableDescritor)) {
|
if (!hrfs.hasReferences(tableDescriptor)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
}
|
}
|
||||||
assertFalse(hrfs.hasReferences(tableDescritor));
|
int newcount = 0;
|
||||||
|
for(HColumnDescriptor colFamily : columnFamilies) {
|
||||||
|
newcount += hrfs.getStoreFiles(colFamily.getName()).size();
|
||||||
|
}
|
||||||
|
assertTrue(newcount > count);
|
||||||
|
// clean up the merged region store files
|
||||||
|
List<HRegion> regions =
|
||||||
|
TEST_UTIL.getHBaseCluster().getRegions(tableDescriptor.getName());
|
||||||
|
for (HRegion region : regions) {
|
||||||
|
CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, region);
|
||||||
|
cleaner.chore();
|
||||||
|
}
|
||||||
// run CatalogJanitor to clean merge references in hbase:meta and archive the
|
// run CatalogJanitor to clean merge references in hbase:meta and archive the
|
||||||
// files of merging regions
|
// files of merging regions
|
||||||
int cleaned = admin.runCatalogScan();
|
int cleaned = admin.runCatalogScan();
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischarger;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
@ -248,6 +249,7 @@ public class TestRegionReplicas {
|
||||||
LOG.info("Flushing primary region");
|
LOG.info("Flushing primary region");
|
||||||
Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
|
Region region = getRS().getRegionByEncodedName(hriPrimary.getEncodedName());
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
|
HRegion primaryRegion = (HRegion) region;
|
||||||
|
|
||||||
// ensure that chore is run
|
// ensure that chore is run
|
||||||
LOG.info("Sleeping for " + (4 * refreshPeriod));
|
LOG.info("Sleeping for " + (4 * refreshPeriod));
|
||||||
|
@ -277,7 +279,7 @@ public class TestRegionReplicas {
|
||||||
assertGetRpc(hriSecondary, 1042, true);
|
assertGetRpc(hriSecondary, 1042, true);
|
||||||
assertGetRpc(hriSecondary, 2042, true);
|
assertGetRpc(hriSecondary, 2042, true);
|
||||||
|
|
||||||
// ensure that we are see the 3 store files
|
// ensure that we see the 3 store files
|
||||||
Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
|
Assert.assertEquals(3, secondaryRegion.getStore(f).getStorefilesCount());
|
||||||
|
|
||||||
// force compaction
|
// force compaction
|
||||||
|
@ -292,7 +294,8 @@ public class TestRegionReplicas {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure that we see the compacted file only
|
// ensure that we see the compacted file only
|
||||||
Assert.assertEquals(1, secondaryRegion.getStore(f).getStorefilesCount());
|
// This will be 4 until the cleaner chore runs
|
||||||
|
Assert.assertEquals(4, secondaryRegion.getStore(f).getStorefilesCount());
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
|
HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, 0, 1000);
|
||||||
|
@ -451,7 +454,9 @@ public class TestRegionReplicas {
|
||||||
LOG.info("Force Major compaction on primary region " + hriPrimary);
|
LOG.info("Force Major compaction on primary region " + hriPrimary);
|
||||||
primaryRegion.compact(true);
|
primaryRegion.compact(true);
|
||||||
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
|
Assert.assertEquals(1, primaryRegion.getStore(f).getStorefilesCount());
|
||||||
|
CompactedHFilesDischarger cleaner =
|
||||||
|
new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion);
|
||||||
|
cleaner.chore();
|
||||||
// scan all the hfiles on the secondary.
|
// scan all the hfiles on the secondary.
|
||||||
// since there are no read on the secondary when we ask locations to
|
// since there are no read on the secondary when we ask locations to
|
||||||
// the NN a FileNotFound exception will be returned and the FileLink
|
// the NN a FileNotFound exception will be returned and the FileLink
|
||||||
|
|
|
@ -1030,6 +1030,18 @@ public class TestStore {
|
||||||
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
|
store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void closeCompactedFile(int index) throws IOException {
|
||||||
|
Collection<StoreFile> files =
|
||||||
|
this.store.getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
StoreFile sf = null;
|
||||||
|
Iterator<StoreFile> it = files.iterator();
|
||||||
|
for (int i = 0; i <= index; i++) {
|
||||||
|
sf = it.next();
|
||||||
|
}
|
||||||
|
sf.closeReader(true);
|
||||||
|
store.getStoreEngine().getStoreFileManager().removeCompactedFiles(Lists.newArrayList(sf));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRefreshStoreFiles() throws Exception {
|
public void testRefreshStoreFiles() throws Exception {
|
||||||
init(name.getMethodName());
|
init(name.getMethodName());
|
||||||
|
@ -1057,6 +1069,7 @@ public class TestStore {
|
||||||
store.refreshStoreFiles();
|
store.refreshStoreFiles();
|
||||||
assertEquals(5, this.store.getStorefilesCount());
|
assertEquals(5, this.store.getStorefilesCount());
|
||||||
|
|
||||||
|
closeCompactedFile(0);
|
||||||
archiveStoreFile(0);
|
archiveStoreFile(0);
|
||||||
|
|
||||||
assertEquals(5, this.store.getStorefilesCount());
|
assertEquals(5, this.store.getStorefilesCount());
|
||||||
|
|
|
@ -137,6 +137,7 @@ public class TestStripeStoreFileManager {
|
||||||
MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
|
MockStoreFile stripe0a = createFile(0, 100, OPEN_KEY, KEY_B),
|
||||||
stripe1 = createFile(KEY_B, OPEN_KEY);
|
stripe1 = createFile(KEY_B, OPEN_KEY);
|
||||||
manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
|
manager.addCompactionResults(al(l0File), al(stripe0a, stripe1));
|
||||||
|
manager.removeCompactedFiles(al(l0File));
|
||||||
// If we want a key <= KEY_A, we should get everything except stripe1.
|
// If we want a key <= KEY_A, we should get everything except stripe1.
|
||||||
ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
|
ArrayList<StoreFile> sfsDump = dumpIterator(manager.getCandidateFilesForRowKeyBefore(KV_A));
|
||||||
assertEquals(2, sfsDump.size());
|
assertEquals(2, sfsDump.size());
|
||||||
|
@ -162,6 +163,7 @@ public class TestStripeStoreFileManager {
|
||||||
// a candidate from the first file, the old one should not be removed.
|
// a candidate from the first file, the old one should not be removed.
|
||||||
StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
|
StoreFile stripe0b = createFile(0, 101, OPEN_KEY, KEY_B);
|
||||||
manager.addCompactionResults(al(l0File2), al(stripe0b));
|
manager.addCompactionResults(al(l0File2), al(stripe0b));
|
||||||
|
manager.removeCompactedFiles(al(l0File2));
|
||||||
sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
|
sfs = manager.getCandidateFilesForRowKeyBefore(KV_A);
|
||||||
assertEquals(stripe0b, sfs.next());
|
assertEquals(stripe0b, sfs.next());
|
||||||
sfs.remove();
|
sfs.remove();
|
||||||
|
@ -350,10 +352,12 @@ public class TestStripeStoreFileManager {
|
||||||
// Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
|
// Here, [B, C] is logically [B, inf), so we should be able to compact it to that only.
|
||||||
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
|
manager.addCompactionResults(al(sf), al(createFile(KEY_B, OPEN_KEY)));
|
||||||
|
manager.removeCompactedFiles(al(sf));
|
||||||
// Do the same for other variants.
|
// Do the same for other variants.
|
||||||
manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
|
manager = createManager(al(sf, createFile(KEY_C, OPEN_KEY)));
|
||||||
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
|
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, KEY_C)));
|
||||||
|
manager.removeCompactedFiles(al(sf));
|
||||||
manager = createManager(al(sf));
|
manager = createManager(al(sf));
|
||||||
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
verifyInvalidCompactionScenario(manager, al(sf), al(createFile(KEY_B, KEY_C)));
|
||||||
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
|
manager.addCompactionResults(al(sf), al(createFile(OPEN_KEY, OPEN_KEY)));
|
||||||
|
@ -379,6 +383,7 @@ public class TestStripeStoreFileManager {
|
||||||
StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
|
StoreFile sf_B2C_0 = createFile(KEY_B, KEY_C);
|
||||||
StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
|
StoreFile sf_C2i_0 = createFile(KEY_C, OPEN_KEY);
|
||||||
manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
manager.addCompactionResults(al(sf_L0_0a), al(sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
||||||
|
manager.removeCompactedFiles(al(sf_L0_0a));
|
||||||
verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
verifyAllFiles(manager, al(sf_L0_0b, sf_i2B_0, sf_B2C_0, sf_C2i_0));
|
||||||
|
|
||||||
// Add another l0 file, "compact" both L0 into two stripes
|
// Add another l0 file, "compact" both L0 into two stripes
|
||||||
|
@ -387,51 +392,61 @@ public class TestStripeStoreFileManager {
|
||||||
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
|
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
|
||||||
manager.insertNewFiles(al(sf_L0_1));
|
manager.insertNewFiles(al(sf_L0_1));
|
||||||
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
|
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
|
||||||
|
manager.removeCompactedFiles(al(sf_L0_0b, sf_L0_1));
|
||||||
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
|
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
|
||||||
|
|
||||||
// Try compacting with invalid file (no metadata) - should add files to L0.
|
// Try compacting with invalid file (no metadata) - should add files to L0.
|
||||||
StoreFile sf_L0_2 = createFile(null, null);
|
StoreFile sf_L0_2 = createFile(null, null);
|
||||||
manager.addCompactionResults(al(), al(sf_L0_2));
|
manager.addCompactionResults(al(), al(sf_L0_2));
|
||||||
|
manager.removeCompactedFiles(al());
|
||||||
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
|
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1, sf_L0_2));
|
||||||
// Remove it...
|
// Remove it...
|
||||||
manager.addCompactionResults(al(sf_L0_2), al());
|
manager.addCompactionResults(al(sf_L0_2), al());
|
||||||
|
manager.removeCompactedFiles(al(sf_L0_2));
|
||||||
|
|
||||||
// Do regular compaction in the first stripe.
|
// Do regular compaction in the first stripe.
|
||||||
StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
|
StoreFile sf_i2B_3 = createFile(OPEN_KEY, KEY_B);
|
||||||
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
|
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
|
||||||
|
manager.removeCompactedFiles(al(sf_i2B_0, sf_i2B_1));
|
||||||
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
|
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
|
||||||
|
|
||||||
// Rebalance two stripes.
|
// Rebalance two stripes.
|
||||||
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
|
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
|
||||||
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
|
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
|
||||||
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
|
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
|
||||||
|
manager.removeCompactedFiles(al(sf_B2C_0, sf_C2i_0, sf_B2C_1));
|
||||||
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
|
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
|
||||||
|
|
||||||
// Split the first stripe.
|
// Split the first stripe.
|
||||||
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
|
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
|
||||||
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
|
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
|
||||||
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
|
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
|
||||||
|
manager.removeCompactedFiles(al(sf_i2B_3));
|
||||||
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
|
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
|
||||||
|
|
||||||
// Split the middle stripe.
|
// Split the middle stripe.
|
||||||
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
|
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
|
||||||
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
|
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
|
||||||
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
|
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
|
||||||
|
manager.removeCompactedFiles(al(sf_B2D_4));
|
||||||
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
|
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
|
||||||
|
|
||||||
// Merge two different middle stripes.
|
// Merge two different middle stripes.
|
||||||
StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
|
StoreFile sf_A2C_7 = createFile(KEY_A, KEY_C);
|
||||||
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
|
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
|
||||||
|
manager.removeCompactedFiles(al(sf_A2B_5, sf_B2C_6));
|
||||||
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
|
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
|
||||||
|
|
||||||
// Merge lower half.
|
// Merge lower half.
|
||||||
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
|
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
|
||||||
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
|
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
|
||||||
|
manager.removeCompactedFiles(al(sf_i2A_5, sf_A2C_7));
|
||||||
verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
|
verifyAllFiles(manager, al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
|
||||||
|
|
||||||
// Merge all.
|
// Merge all.
|
||||||
StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
|
StoreFile sf_i2i_9 = createFile(OPEN_KEY, OPEN_KEY);
|
||||||
manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
|
manager.addCompactionResults(al(sf_D2i_4, sf_C2D_6, sf_i2C_8), al(sf_i2i_9));
|
||||||
|
manager.removeCompactedFiles(al(sf_D2i_4, sf_C2D_6, sf_i2C_8));
|
||||||
verifyAllFiles(manager, al(sf_i2i_9));
|
verifyAllFiles(manager, al(sf_i2i_9));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -451,12 +466,14 @@ public class TestStripeStoreFileManager {
|
||||||
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
|
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
|
||||||
// Remove these files.
|
// Remove these files.
|
||||||
sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
|
sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
|
||||||
|
sfm.removeCompactedFiles(al(sf_i2d, sf_d2i));
|
||||||
assertEquals(0, sfm.getLevel0Files().size());
|
assertEquals(0, sfm.getLevel0Files().size());
|
||||||
// Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
|
// Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
|
||||||
// presumably flushed during compaction, should go to L0.
|
// presumably flushed during compaction, should go to L0.
|
||||||
StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
|
StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
|
||||||
sfm.insertNewFiles(al(sf_i2c_2));
|
sfm.insertNewFiles(al(sf_i2c_2));
|
||||||
sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
|
sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
|
||||||
|
sfm.removeCompactedFiles(al(sf_i2c, sf_c2i));
|
||||||
assertEquals(1, sfm.getLevel0Files().size());
|
assertEquals(1, sfm.getLevel0Files().size());
|
||||||
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
|
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
|
||||||
}
|
}
|
||||||
|
@ -472,9 +489,11 @@ public class TestStripeStoreFileManager {
|
||||||
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
|
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
|
||||||
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
|
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
|
||||||
manager.addCompactionResults(al(sf0a), compacted);
|
manager.addCompactionResults(al(sf0a), compacted);
|
||||||
|
manager.removeCompactedFiles(al(sf0a));
|
||||||
// Next L0 compaction only produces file for the first and last stripe.
|
// Next L0 compaction only produces file for the first and last stripe.
|
||||||
ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY));
|
ArrayList<StoreFile> compacted2 = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_C, OPEN_KEY));
|
||||||
manager.addCompactionResults(al(sf0b), compacted2);
|
manager.addCompactionResults(al(sf0b), compacted2);
|
||||||
|
manager.removeCompactedFiles(al(sf0b));
|
||||||
compacted.addAll(compacted2);
|
compacted.addAll(compacted2);
|
||||||
verifyAllFiles(manager, compacted);
|
verifyAllFiles(manager, compacted);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,389 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.compactions;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ MediumTests.class, RegionServerTests.class })
|
||||||
|
public class TestCompactedHFilesDischarger {
|
||||||
|
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
|
||||||
|
private Region region;
|
||||||
|
private final static byte[] fam = Bytes.toBytes("cf_1");
|
||||||
|
private final static byte[] qual1 = Bytes.toBytes("qf_1");
|
||||||
|
private final static byte[] val = Bytes.toBytes("val");
|
||||||
|
private static CountDownLatch latch = new CountDownLatch(3);
|
||||||
|
private static AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
private static AtomicInteger scanCompletedCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf(getClass().getSimpleName());
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||||
|
htd.addFamily(new HColumnDescriptor(fam));
|
||||||
|
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
|
||||||
|
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
|
||||||
|
region = HBaseTestingUtility.createRegionAndWAL(info, path, testUtil.getConfiguration(), htd);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
counter.set(0);
|
||||||
|
scanCompletedCounter.set(0);
|
||||||
|
latch = new CountDownLatch(3);
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||||
|
testUtil.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompactedHFilesCleaner() throws Exception {
|
||||||
|
// Create the cleaner object
|
||||||
|
CompactedHFilesDischarger cleaner =
|
||||||
|
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
|
||||||
|
// Add some data to the region and do some flushes
|
||||||
|
for (int i = 1; i < 10; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 11; i < 20; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 21; i < 30; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
Store store = region.getStore(fam);
|
||||||
|
assertEquals(3, store.getStorefilesCount());
|
||||||
|
|
||||||
|
Collection<StoreFile> storefiles = store.getStorefiles();
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
// None of the files should be in compacted state.
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
// Try to run the cleaner without compaction. there should not be any change
|
||||||
|
cleaner.chore();
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
// None of the files should be in compacted state.
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
// now do some compaction
|
||||||
|
region.compact(true);
|
||||||
|
// Still the flushed files should be present until the cleaner runs. But the state of it should
|
||||||
|
// be in COMPACTED state
|
||||||
|
assertEquals(1, store.getStorefilesCount());
|
||||||
|
assertEquals(3,
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
|
||||||
|
|
||||||
|
// Run the cleaner
|
||||||
|
cleaner.chore();
|
||||||
|
assertEquals(1, store.getStorefilesCount());
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
// Should not be in compacted state
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
assertTrue(compactedfiles.size() == 0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanerWithParallelScannersAfterCompaction() throws Exception {
|
||||||
|
// Create the cleaner object
|
||||||
|
CompactedHFilesDischarger cleaner =
|
||||||
|
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
|
||||||
|
// Add some data to the region and do some flushes
|
||||||
|
for (int i = 1; i < 10; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 11; i < 20; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 21; i < 30; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
Store store = region.getStore(fam);
|
||||||
|
assertEquals(3, store.getStorefilesCount());
|
||||||
|
|
||||||
|
Collection<StoreFile> storefiles = store.getStorefiles();
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
// None of the files should be in compacted state.
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
// Do compaction
|
||||||
|
region.compact(true);
|
||||||
|
startScannerThreads();
|
||||||
|
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
int usedReaderCount = 0;
|
||||||
|
int unusedReaderCount = 0;
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
if (file.getRefCount() == 3) {
|
||||||
|
usedReaderCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
for(StoreFile file : compactedfiles) {
|
||||||
|
assertEquals("Refcount should be 3", 0, file.getRefCount());
|
||||||
|
unusedReaderCount++;
|
||||||
|
}
|
||||||
|
// Though there are files we are not using them for reads
|
||||||
|
assertEquals("unused reader count should be 3", 3, unusedReaderCount);
|
||||||
|
assertEquals("used reader count should be 1", 1, usedReaderCount);
|
||||||
|
// now run the cleaner
|
||||||
|
cleaner.chore();
|
||||||
|
countDown();
|
||||||
|
assertEquals(1, store.getStorefilesCount());
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
// Should not be in compacted state
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
assertTrue(compactedfiles.size() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCleanerWithParallelScanners() throws Exception {
|
||||||
|
// Create the cleaner object
|
||||||
|
CompactedHFilesDischarger cleaner =
|
||||||
|
new CompactedHFilesDischarger(1000, (Stoppable) null, (HRegion) region);
|
||||||
|
// Add some data to the region and do some flushes
|
||||||
|
for (int i = 1; i < 10; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 11; i < 20; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
for (int i = 21; i < 30; i++) {
|
||||||
|
Put p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.addColumn(fam, qual1, val);
|
||||||
|
region.put(p);
|
||||||
|
}
|
||||||
|
// flush them
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
Store store = region.getStore(fam);
|
||||||
|
assertEquals(3, store.getStorefilesCount());
|
||||||
|
|
||||||
|
Collection<StoreFile> storefiles = store.getStorefiles();
|
||||||
|
Collection<StoreFile> compactedfiles =
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
// None of the files should be in compacted state.
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
startScannerThreads();
|
||||||
|
// Do compaction
|
||||||
|
region.compact(true);
|
||||||
|
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
int usedReaderCount = 0;
|
||||||
|
int unusedReaderCount = 0;
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
if (file.getRefCount() == 0) {
|
||||||
|
unusedReaderCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
compactedfiles =
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
for(StoreFile file : compactedfiles) {
|
||||||
|
assertEquals("Refcount should be 3", 3, file.getRefCount());
|
||||||
|
usedReaderCount++;
|
||||||
|
}
|
||||||
|
// The newly compacted file will not be used by any scanner
|
||||||
|
assertEquals("unused reader count should be 1", 1, unusedReaderCount);
|
||||||
|
assertEquals("used reader count should be 3", 3, usedReaderCount);
|
||||||
|
// now run the cleaner
|
||||||
|
cleaner.chore();
|
||||||
|
countDown();
|
||||||
|
// No change in the number of store files as none of the compacted files could be cleaned up
|
||||||
|
assertEquals(1, store.getStorefilesCount());
|
||||||
|
assertEquals(3,
|
||||||
|
((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles().size());
|
||||||
|
while (scanCompletedCounter.get() != 3) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
// reset
|
||||||
|
latch = new CountDownLatch(3);
|
||||||
|
scanCompletedCounter.set(0);
|
||||||
|
counter.set(0);
|
||||||
|
// Try creating a new scanner and it should use only the new file created after compaction
|
||||||
|
startScannerThreads();
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
usedReaderCount = 0;
|
||||||
|
unusedReaderCount = 0;
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
if (file.getRefCount() == 3) {
|
||||||
|
usedReaderCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
for(StoreFile file : compactedfiles) {
|
||||||
|
assertEquals("Refcount should be 0", 0, file.getRefCount());
|
||||||
|
unusedReaderCount++;
|
||||||
|
}
|
||||||
|
// Though there are files we are not using them for reads
|
||||||
|
assertEquals("unused reader count should be 3", 3, unusedReaderCount);
|
||||||
|
assertEquals("used reader count should be 1", 1, usedReaderCount);
|
||||||
|
countDown();
|
||||||
|
while (scanCompletedCounter.get() != 3) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
// Run the cleaner again
|
||||||
|
cleaner.chore();
|
||||||
|
// Now the cleaner should be able to clear it up because there are no active readers
|
||||||
|
assertEquals(1, store.getStorefilesCount());
|
||||||
|
storefiles = store.getStorefiles();
|
||||||
|
for (StoreFile file : storefiles) {
|
||||||
|
// Should not be in compacted state
|
||||||
|
assertFalse(file.isCompactedAway());
|
||||||
|
}
|
||||||
|
compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles();
|
||||||
|
assertTrue(compactedfiles.size() == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void countDown() {
|
||||||
|
// count down 3 times
|
||||||
|
latch.countDown();
|
||||||
|
latch.countDown();
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void startScannerThreads() throws InterruptedException {
|
||||||
|
// Start parallel scan threads
|
||||||
|
ScanThread[] scanThreads = new ScanThread[3];
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
scanThreads[i] = new ScanThread((HRegion) region);
|
||||||
|
}
|
||||||
|
for (ScanThread thread : scanThreads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
while (counter.get() != 3) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ScanThread extends Thread {
|
||||||
|
private final HRegion region;
|
||||||
|
|
||||||
|
public ScanThread(HRegion region) {
|
||||||
|
this.region = region;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
initiateScan(region);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initiateScan(HRegion region) throws IOException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setCaching(1);
|
||||||
|
RegionScanner resScanner = null;
|
||||||
|
try {
|
||||||
|
resScanner = region.getScanner(scan);
|
||||||
|
List<Cell> results = new ArrayList<Cell>();
|
||||||
|
boolean next = resScanner.next(results);
|
||||||
|
try {
|
||||||
|
counter.incrementAndGet();
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
while (!next) {
|
||||||
|
resScanner.next(results);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
scanCompletedCounter.incrementAndGet();
|
||||||
|
resScanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue