HBASE-5920 New Compactions Logic can silently prevent user-initiated compactions from occurring

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1340280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-05-18 22:08:44 +00:00
parent 373270dd77
commit ca81969a4f
4 changed files with 127 additions and 38 deletions

View File

@ -50,12 +50,6 @@ public class CompactSplitThread implements CompactionRequestor {
private final ThreadPoolExecutor smallCompactions; private final ThreadPoolExecutor smallCompactions;
private final ThreadPoolExecutor splits; private final ThreadPoolExecutor splits;
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/
public static final int PRIORITY_USER = 1;
public static final int NO_PRIORITY = Integer.MIN_VALUE;
/** /**
* Splitting should not take place if the total number of regions exceed this. * Splitting should not take place if the total number of regions exceed this.
* This is not a hard limit to the number of regions but it is a guideline to * This is not a hard limit to the number of regions but it is a guideline to
@ -129,7 +123,7 @@ public class CompactSplitThread implements CompactionRequestor {
public synchronized boolean requestSplit(final HRegion r) { public synchronized boolean requestSplit(final HRegion r) {
// don't split regions that are blocking // don't split regions that are blocking
if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) { if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
byte[] midKey = r.checkSplit(); byte[] midKey = r.checkSplit();
if (midKey != null) { if (midKey != null) {
requestSplit(r, midKey); requestSplit(r, midKey);
@ -158,13 +152,13 @@ public class CompactSplitThread implements CompactionRequestor {
public synchronized void requestCompaction(final HRegion r, public synchronized void requestCompaction(final HRegion r,
final String why) { final String why) {
for(Store s : r.getStores().values()) { for(Store s : r.getStores().values()) {
requestCompaction(r, s, why, NO_PRIORITY); requestCompaction(r, s, why, Store.NO_PRIORITY);
} }
} }
public synchronized void requestCompaction(final HRegion r, final Store s, public synchronized void requestCompaction(final HRegion r, final Store s,
final String why) { final String why) {
requestCompaction(r, s, why, NO_PRIORITY); requestCompaction(r, s, why, Store.NO_PRIORITY);
} }
public synchronized void requestCompaction(final HRegion r, final String why, public synchronized void requestCompaction(final HRegion r, final String why,
@ -185,10 +179,10 @@ public class CompactSplitThread implements CompactionRequestor {
if (this.server.isStopped()) { if (this.server.isStopped()) {
return; return;
} }
CompactionRequest cr = s.requestCompaction(); CompactionRequest cr = s.requestCompaction(priority);
if (cr != null) { if (cr != null) {
cr.setServer(server); cr.setServer(server);
if (priority != NO_PRIORITY) { if (priority != Store.NO_PRIORITY) {
cr.setPriority(priority); cr.setPriority(priority);
} }
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
@ -200,6 +194,11 @@ public class CompactSplitThread implements CompactionRequestor {
+ (why != null && !why.isEmpty() ? "; Because: " + why : "") + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; " + this); + "; " + this);
} }
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + r.getRegionNameAsString() +
" because compaction request was cancelled");
}
} }
} }

View File

@ -3532,9 +3532,11 @@ public class HRegionServer implements ClientProtocol,
if (major) { if (major) {
region.triggerMajorCompaction(); region.triggerMajorCompaction();
} }
LOG.trace("User-triggered compaction requested for region " +
region.getRegionNameAsString());
compactSplitThread.requestCompaction(region, compactSplitThread.requestCompaction(region,
"User-triggered " + (major ? "major " : "") + "compaction", "User-triggered " + (major ? "major " : "") + "compaction",
CompactSplitThread.PRIORITY_USER); Store.PRIORITY_USER);
return CompactRegionResponse.newBuilder().build(); return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) { } catch (IOException ie) {
throw new ServiceException(ie); throw new ServiceException(ie);

View File

@ -108,6 +108,7 @@ import com.google.common.collect.Lists;
@InterfaceAudience.Private @InterfaceAudience.Private
public class Store extends SchemaConfigured implements HeapSize { public class Store extends SchemaConfigured implements HeapSize {
static final Log LOG = LogFactory.getLog(Store.class); static final Log LOG = LogFactory.getLog(Store.class);
protected final MemStore memstore; protected final MemStore memstore;
// This stores directory in the filesystem. // This stores directory in the filesystem.
private final Path homedir; private final Path homedir;
@ -133,6 +134,12 @@ public class Store extends SchemaConfigured implements HeapSize {
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads; private final boolean verifyBulkLoads;
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/
public static final int PRIORITY_USER = 1;
public static final int NO_PRIORITY = Integer.MIN_VALUE;
// not private for testing // not private for testing
/* package */ScanInfo scanInfo; /* package */ScanInfo scanInfo;
/* /*
@ -166,7 +173,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* @param region * @param region
* @param family HColumnDescriptor for this column * @param family HColumnDescriptor for this column
* @param fs file system object * @param fs file system object
* @param conf configuration object * @param confParam configuration object
* failed. Can be null. * failed. Can be null.
* @throws IOException * @throws IOException
*/ */
@ -342,7 +349,7 @@ public class Store extends SchemaConfigured implements HeapSize {
Path getHomedir() { Path getHomedir() {
return homedir; return homedir;
} }
/** /**
* @return the data block encoder * @return the data block encoder
*/ */
@ -464,7 +471,7 @@ public class Store extends SchemaConfigured implements HeapSize {
/** /**
* Removes a kv from the memstore. The KeyValue is removed only * Removes a kv from the memstore. The KeyValue is removed only
* if its key & memstoreTS matches the key & memstoreTS value of the * if its key & memstoreTS matches the key & memstoreTS value of the
* kv parameter. * kv parameter.
* *
* @param kv * @param kv
@ -550,8 +557,8 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
/** /**
* This method should only be called from HRegion. It is assumed that the * This method should only be called from HRegion. It is assumed that the
* ranges of values in the HFile fit within the stores assigned region. * ranges of values in the HFile fit within the stores assigned region.
* (assertBulkLoadHFileOk checks this) * (assertBulkLoadHFileOk checks this)
*/ */
void bulkLoadHFile(String srcPathStr) throws IOException { void bulkLoadHFile(String srcPathStr) throws IOException {
@ -631,7 +638,7 @@ public class Store extends SchemaConfigured implements HeapSize {
ThreadPoolExecutor storeFileCloserThreadPool = this.region ThreadPoolExecutor storeFileCloserThreadPool = this.region
.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+ this.family.getNameAsString()); + this.family.getNameAsString());
// close each store file in parallel // close each store file in parallel
CompletionService<Void> completionService = CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(storeFileCloserThreadPool); new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
@ -643,7 +650,7 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
}); });
} }
try { try {
for (int i = 0; i < result.size(); i++) { for (int i = 0; i < result.size(); i++) {
Future<Void> future = completionService.take(); Future<Void> future = completionService.take();
@ -772,7 +779,7 @@ public class Store extends SchemaConfigured implements HeapSize {
scanner.close(); scanner.close();
} }
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Flushed " + LOG.info("Flushed " +
", sequenceid=" + logCacheFlushId + ", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) + ", memsize=" + StringUtils.humanReadableInt(flushed) +
", into tmp file " + pathName); ", into tmp file " + pathName);
@ -983,7 +990,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* <p>We don't want to hold the structureLock for the whole time, as a compact() * <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period. * can be lengthy and we want to allow cache-flushes during this period.
* *
* @param CompactionRequest * @param cr
* compaction details obtained from requestCompaction() * compaction details obtained from requestCompaction()
* @throws IOException * @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early. * @return Storefile we compacted into or null if we failed or opted out early.
@ -1221,7 +1228,7 @@ public class Store extends SchemaConfigured implements HeapSize {
if (jitterPct > 0) { if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct); long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart // deterministic jitter avoids a major compaction storm on restart
ImmutableList<StoreFile> snapshot = storefiles; ImmutableList<StoreFile> snapshot = storefiles;
if (snapshot != null && !snapshot.isEmpty()) { if (snapshot != null && !snapshot.isEmpty()) {
String seed = snapshot.get(0).getPath().getName(); String seed = snapshot.get(0).getPath().getName();
double curRand = new Random(seed.hashCode()).nextDouble(); double curRand = new Random(seed.hashCode()).nextDouble();
@ -1235,6 +1242,10 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
public CompactionRequest requestCompaction() { public CompactionRequest requestCompaction() {
return requestCompaction(NO_PRIORITY);
}
public CompactionRequest requestCompaction(int priority) {
// don't even select for compaction if writes are disabled // don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) { if (!this.region.areWritesEnabled()) {
return null; return null;
@ -1265,7 +1276,7 @@ public class Store extends SchemaConfigured implements HeapSize {
// coprocessor is overriding normal file selection // coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates); filesToCompact = new CompactSelection(conf, candidates);
} else { } else {
filesToCompact = compactSelection(candidates); filesToCompact = compactSelection(candidates, priority);
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -1295,7 +1306,7 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
// everything went better than expected. create a compaction request // everything went better than expected. create a compaction request
int pri = getCompactPriority(); int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
} }
} catch (IOException ex) { } catch (IOException ex) {
@ -1314,6 +1325,16 @@ public class Store extends SchemaConfigured implements HeapSize {
} }
} }
/**
* Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
* @param candidates
* @return
* @throws IOException
*/
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
return compactSelection(candidates,NO_PRIORITY);
}
/** /**
* Algorithm to choose which files to compact * Algorithm to choose which files to compact
* *
@ -1333,7 +1354,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* @return subset copy of candidate list that meets compaction criteria * @return subset copy of candidate list that meets compaction criteria
* @throws IOException * @throws IOException
*/ */
CompactSelection compactSelection(List<StoreFile> candidates) CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException { throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function // ASSUMPTION!!! filesCompacting is locked when calling this function
@ -1381,10 +1402,16 @@ public class Store extends SchemaConfigured implements HeapSize {
return compactSelection; return compactSelection;
} }
// major compact on user action or age (caveat: we have too many files) // Force a major compaction if this is a user-requested major compaction,
boolean majorcompaction = // or if we do not have too many files to compact and this was requested
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) // as a major compaction
&& compactSelection.getFilesToCompact().size() < this.maxFilesToCompact; boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
);
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
this.getColumnFamilyName() + ": Initiating " +
(majorcompaction ? "major" : "minor") + "compaction");
if (!majorcompaction && if (!majorcompaction &&
!hasReferences(compactSelection.getFilesToCompact())) { !hasReferences(compactSelection.getFilesToCompact())) {
@ -1394,6 +1421,11 @@ public class Store extends SchemaConfigured implements HeapSize {
// skip selection algorithm if we don't have enough files // skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) { if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting files because we only have " +
compactSelection.getFilesToCompact().size() +
" files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
}
compactSelection.emptyFileList(); compactSelection.emptyFileList();
return compactSelection; return compactSelection;
} }
@ -1461,11 +1493,18 @@ public class Store extends SchemaConfigured implements HeapSize {
return compactSelection; return compactSelection;
} }
} else { } else {
// all files included in this compaction, up to max if(majorcompaction) {
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) { if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
int pastMax = LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
compactSelection.getFilesToCompact().size() - this.maxFilesToCompact; " files, probably because of a user-requested major compaction");
compactSelection.clearSubList(0, pastMax); if(priority != PRIORITY_USER) {
LOG.error("Compacting more than max files on a non user-requested compaction");
}
}
} else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
// all files included in this compaction, up to max
int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
compactSelection.getFilesToCompact().subList(0, pastMax).clear();
} }
} }
return compactSelection; return compactSelection;
@ -1991,11 +2030,21 @@ public class Store extends SchemaConfigured implements HeapSize {
return this.memstore.heapSize(); return this.memstore.heapSize();
} }
public int getCompactPriority() {
return getCompactPriority(NO_PRIORITY);
}
/** /**
* @return The priority that this store should have in the compaction queue * @return The priority that this store should have in the compaction queue
* @param priority
*/ */
public int getCompactPriority() { public int getCompactPriority(int priority) {
return this.blockingStoreFileCount - this.storefiles.size(); // If this is a user-requested compaction, leave this at the highest priority
if(priority == PRIORITY_USER) {
return PRIORITY_USER;
} else {
return this.blockingStoreFileCount - this.storefiles.size();
}
} }
boolean throttleCompaction(long compactionSize) { boolean throttleCompaction(long compactionSize) {
@ -2131,7 +2180,7 @@ public class Store extends SchemaConfigured implements HeapSize {
return this.cacheConf; return this.cacheConf;
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE + ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+ (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG) + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
@ -2209,7 +2258,7 @@ public class Store extends SchemaConfigured implements HeapSize {
public boolean getKeepDeletedCells() { public boolean getKeepDeletedCells() {
return keepDeletedCells; return keepDeletedCells;
} }
public long getTimeToPurgeDeletes() { public long getTimeToPurgeDeletes() {
return timeToPurgeDeletes; return timeToPurgeDeletes;
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -76,6 +77,7 @@ public class TestCompaction extends HBaseTestCase {
private int compactionThreshold; private int compactionThreshold;
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes; private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
final private byte[] col1, col2; final private byte[] col1, col2;
private static final long MAX_FILES_TO_COMPACT = 10;
/** constructor */ /** constructor */
public TestCompaction() throws Exception { public TestCompaction() throws Exception {
@ -614,6 +616,43 @@ public class TestCompaction extends HBaseTestCase {
fail("testCompactionWithCorruptResult failed since no exception was" + fail("testCompactionWithCorruptResult failed since no exception was" +
"thrown while completing a corrupt file"); "thrown while completing a corrupt file");
} }
/**
* Test for HBASE-5920 - Test user requested major compactions always occurring
*/
public void testNonUserMajorCompactionRequest() throws Exception {
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
false,
request.isMajor());
}
/**
* Test for HBASE-5920
*/
public void testUserMajorCompactionRequest() throws IOException{
Store store = r.getStore(COLUMN_FAMILY);
createStoreFile(r);
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
createStoreFile(r);
}
store.triggerMajorCompaction();
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
true,
request.isMajor());
}
@org.junit.Rule @org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =