HBASE-938 major compaction period is not checked periodically

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@714200 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-11-15 00:36:27 +00:00
parent b5bdaf2c1f
commit 79be2d87f7
8 changed files with 241 additions and 80 deletions

View File

@ -119,6 +119,7 @@ Release 0.19.0 - Unreleased
HBASE-998 Narrow getClosestRowBefore by passing column family HBASE-998 Narrow getClosestRowBefore by passing column family
HBASE-999 Up versions on historian and keep history of deleted regions for a HBASE-999 Up versions on historian and keep history of deleted regions for a
while rather than delete immediately while rather than delete immediately
HBASE-938 Major compaction period is not checked periodically
NEW FEATURES NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -117,13 +117,16 @@ class CompactSplitThread extends Thread implements HConstants {
/** /**
* @param r HRegion store belongs to * @param r HRegion store belongs to
* @param why Why compaction requested -- used in debug messages
*/ */
public synchronized void compactionRequested(HRegion r) { public synchronized void compactionRequested(final HRegion r,
final String why) {
if (this.server.stopRequested.get()) { if (this.server.stopRequested.get()) {
return; return;
} }
LOG.debug("Compaction requested for region: " + LOG.debug("Compaction requested for region " +
Bytes.toString(r.getRegionName())); Bytes.toString(r.getRegionName()) +
(why != null && !why.isEmpty()? " because: " + why: ""));
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) { if (!regionsInQueue.contains(r)) {
compactionQueue.add(r); compactionQueue.add(r);

View File

@ -1336,8 +1336,8 @@ public class HRegion implements HConstants {
try { try {
for (HStore store : stores.values()) { for (HStore store : stores.values()) {
List<HStoreKey> keys = List<HStoreKey> keys =
store.getKeys(new HStoreKey(row, ts, this.regionInfo), ALL_VERSIONS, store.getKeys(new HStoreKey(row, ts, this.regionInfo),
now, null); ALL_VERSIONS, now, null);
TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
new HStoreKey.HStoreKeyWritableComparator(regionInfo)); new HStoreKey.HStoreKeyWritableComparator(regionInfo));
for (HStoreKey key: keys) { for (HStoreKey key: keys) {
@ -1369,7 +1369,8 @@ public class HRegion implements HConstants {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
try { try {
for (HStore store : stores.values()) { for (HStore store : stores.values()) {
List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp, this.regionInfo), List<HStoreKey> keys =
store.getKeys(new HStoreKey(row, timestamp, this.regionInfo),
ALL_VERSIONS, now, columnPattern); ALL_VERSIONS, now, columnPattern);
TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
new HStoreKey.HStoreKeyWritableComparator(regionInfo)); new HStoreKey.HStoreKeyWritableComparator(regionInfo));
@ -2401,6 +2402,19 @@ public class HRegion implements HConstants {
listPaths(fs, r.getRegionDir()); listPaths(fs, r.getRegionDir());
} }
/**
* @return True if needs a mojor compaction.
* @throws IOException
*/
boolean isMajorCompaction() throws IOException {
for (HStore store: this.stores.values()) {
if (store.isMajorCompaction()) {
return true;
}
}
return false;
}
/* /*
* List the files under the specified directory * List the files under the specified directory
* *

View File

@ -52,6 +52,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -175,38 +176,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
private RegionServerMetrics metrics; private RegionServerMetrics metrics;
/**
* Thread to shutdown the region server in an orderly manner. This thread
* is registered as a shutdown hook in the HRegionServer constructor and is
* only called when the HRegionServer receives a kill signal.
*/
class ShutdownThread extends Thread {
private final HRegionServer instance;
/**
* @param instance
*/
public ShutdownThread(HRegionServer instance) {
this.instance = instance;
}
@Override
public void run() {
LOG.info("Starting shutdown thread.");
// tell the region server to stop and wait for it to complete
instance.stop();
instance.join();
LOG.info("Shutdown thread complete");
}
}
// Compactions // Compactions
final CompactSplitThread compactSplitThread; final CompactSplitThread compactSplitThread;
// Cache flushing // Cache flushing
final MemcacheFlusher cacheFlusher; final MemcacheFlusher cacheFlusher;
/* Check for major compactions.
*/
final Chore majorCompactionChecker;
// HLog and HLog roller. log is protected rather than private to avoid // HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected volatile HLog log; protected volatile HLog log;
@ -261,6 +240,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.logFlusher = this.logFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested); new LogFlusher(this.threadWakeFrequency, this.stopRequested);
// Background thread to check for major compactions; needed if region
// has not gotten updates in a while. Make it run at a lesser frequency.
int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
".multiplier", 1000);
this.majorCompactionChecker = new MajorCompactionChecker(this,
this.threadWakeFrequency * multiplier, this.stopRequested);
// Task thread to process requests from Master // Task thread to process requests from Master
this.worker = new Worker(); this.worker = new Worker();
this.workerThread = new Thread(worker); this.workerThread = new Thread(worker);
@ -474,6 +460,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
logFlusher.interrupt(); logFlusher.interrupt();
compactSplitThread.interruptIfNecessary(); compactSplitThread.interruptIfNecessary();
logRoller.interruptIfNecessary(); logRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
if (abortRequested) { if (abortRequested) {
if (this.fsOk) { if (this.fsOk) {
@ -572,6 +559,66 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
} }
/*
* Thread to shutdown the region server in an orderly manner. This thread
* is registered as a shutdown hook in the HRegionServer constructor and is
* only called when the HRegionServer receives a kill signal.
*/
private static class ShutdownThread extends Thread {
private final Log LOG = LogFactory.getLog(this.getClass());
private final HRegionServer instance;
/**
* @param instance
*/
public ShutdownThread(HRegionServer instance) {
this.instance = instance;
}
@Override
public void run() {
LOG.info("Starting shutdown thread.");
// tell the region server to stop and wait for it to complete
instance.stop();
instance.join();
LOG.info("Shutdown thread complete");
}
}
/*
* Inner class that runs on a long period checking if regions need major
* compaction.
*/
private static class MajorCompactionChecker extends Chore {
private final Log LOG = LogFactory.getLog(this.getClass());
private final HRegionServer instance;
MajorCompactionChecker(final HRegionServer h,
final int sleepTime, final AtomicBoolean stopper) {
super(sleepTime, stopper);
this.instance = h;
LOG.info("Runs every " + sleepTime + "ms");
}
@Override
protected void chore() {
Set<Integer> keys = this.instance.onlineRegions.keySet();
for (Integer i: keys) {
HRegion r = this.instance.onlineRegions.get(i);
try {
if (r != null && r.isMajorCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.
compactionRequested(r, getName() + " requests major compaction");
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
};
/** /**
* Report the status of the server. A server is online once all the startup * Report the status of the server. A server is online once all the startup
* is completed (setting up filesystem, starting service threads, etc.). This * is completed (setting up filesystem, starting service threads, etc.). This
@ -660,6 +707,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
handler); handler);
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
Threads.setDaemonThreadRunning(this.majorCompactionChecker,
n + ".majorCompactionChecker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
this.leases.setName(n + ".leaseChecker"); this.leases.setName(n + ".leaseChecker");
@ -690,7 +740,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Verify that all threads are alive // Verify that all threads are alive
if (!(leases.isAlive() && compactSplitThread.isAlive() && if (!(leases.isAlive() && compactSplitThread.isAlive() &&
cacheFlusher.isAlive() && logRoller.isAlive() && cacheFlusher.isAlive() && logRoller.isAlive() &&
workerThread.isAlive())) { workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
// One or more threads are no longer alive - shut down // One or more threads are no longer alive - shut down
stop(); stop();
return false; return false;
@ -750,20 +800,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* Presumption is that all closes and stops have already been called. * Presumption is that all closes and stops have already been called.
*/ */
void join() { void join() {
join(this.workerThread); Threads.shutdown(this.majorCompactionChecker);
join(this.cacheFlusher); Threads.shutdown(this.workerThread);
join(this.compactSplitThread); Threads.shutdown(this.cacheFlusher);
join(this.logRoller); Threads.shutdown(this.compactSplitThread);
} Threads.shutdown(this.logRoller);
private void join(final Thread t) {
while (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
}
}
} }
/* /*
@ -925,13 +966,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Force split a region // Force split a region
HRegion region = getRegion(info.getRegionName()); HRegion region = getRegion(info.getRegionName());
region.regionInfo.shouldSplit(true); region.regionInfo.shouldSplit(true);
compactSplitThread.compactionRequested(region); compactSplitThread.compactionRequested(region,
"MSG_REGION_SPLIT");
} break; } break;
case MSG_REGION_COMPACT: { case MSG_REGION_COMPACT: {
// Compact a region // Compact a region
HRegion region = getRegion(info.getRegionName()); HRegion region = getRegion(info.getRegionName());
compactSplitThread.compactionRequested(region); compactSplitThread.compactionRequested(region,
"MSG_REGION_COMPACT");
} break; } break;
default: default:
@ -983,7 +1026,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try { try {
region = instantiateRegion(regionInfo); region = instantiateRegion(regionInfo);
// Startup a compaction early if one is needed. // Startup a compaction early if one is needed.
this.compactSplitThread.compactionRequested(region); this.compactSplitThread.
compactionRequested(region, "Region open check");
} catch (IOException e) { } catch (IOException e) {
LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e); LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e);

View File

@ -435,6 +435,7 @@ public class HStore implements HConstants {
curfile = new HStoreFile(conf, fs, basedir, this.info, curfile = new HStoreFile(conf, fs, basedir, this.info,
family.getName(), fid, reference); family.getName(), fid, reference);
long storeSeqId = -1; long storeSeqId = -1;
boolean majorCompaction = false;
try { try {
storeSeqId = curfile.loadInfo(fs); storeSeqId = curfile.loadInfo(fs);
if (storeSeqId > this.maxSeqId) { if (storeSeqId > this.maxSeqId) {
@ -488,7 +489,9 @@ public class HStore implements HConstants {
// Found map and sympathetic info file. Add this hstorefile to result. // Found map and sympathetic info file. Add this hstorefile to result.
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" + LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
isReference + ", sequence id=" + storeSeqId + ", length=" + length); isReference + ", sequence id=" + storeSeqId +
", length=" + length + ", majorCompaction=" +
curfile.isMajorCompaction());
} }
results.put(Long.valueOf(storeSeqId), curfile); results.put(Long.valueOf(storeSeqId), curfile);
// Keep list of sympathetic data mapfiles for cleaning info dir in next // Keep list of sympathetic data mapfiles for cleaning info dir in next
@ -691,7 +694,8 @@ public class HStore implements HConstants {
" with " + entries + " with " + entries +
" entries, sequence id " + logCacheFlushId + ", data size " + " entries, sequence id " + logCacheFlushId + ", data size " +
StringUtils.humanReadableInt(flushed) + ", file size " + StringUtils.humanReadableInt(flushed) + ", file size " +
StringUtils.humanReadableInt(newStoreSize)); StringUtils.humanReadableInt(newStoreSize) + " to " +
this.info.getRegionNameAsString());
} }
} }
return storefiles.size() >= compactionThreshold; return storefiles.size() >= compactionThreshold;
@ -832,11 +836,11 @@ public class HStore implements HConstants {
// Check to see if we need to do a major compaction on this region. // Check to see if we need to do a major compaction on this region.
// If so, change doMajorCompaction to true to skip the incremental // If so, change doMajorCompaction to true to skip the incremental
// compacting below. Only check if doMajorCompaction is not true. // compacting below. Only check if doMajorCompaction is not true.
long lastMajorCompaction = 0L;
if (!doMajorCompaction) { if (!doMajorCompaction) {
doMajorCompaction = isMajorCompaction(); doMajorCompaction = isMajorCompaction(filesToCompact);
} }
if (!doMajorCompaction && !hasReferences(filesToCompact) && boolean references = hasReferences(filesToCompact);
if (!doMajorCompaction && !references &&
filesToCompact.size() < compactionThreshold) { filesToCompact.size() < compactionThreshold) {
return checkSplit(forceSplit); return checkSplit(forceSplit);
} }
@ -862,7 +866,7 @@ public class HStore implements HConstants {
fileSizes[i] = len; fileSizes[i] = len;
totalSize += len; totalSize += len;
} }
if (!doMajorCompaction && !hasReferences(filesToCompact)) { if (!doMajorCompaction && !references) {
// Here we select files for incremental compaction. // Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the // The rule is: if the largest(oldest) one is more than twice the
// size of the second, skip the largest, and continue to next..., // size of the second, skip the largest, and continue to next...,
@ -888,7 +892,7 @@ public class HStore implements HConstants {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Compaction size of " + this.storeNameStr + ": " + LOG.debug("Compaction size of " + this.storeNameStr + ": " +
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point + StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
" files , size: " + skipped); " file(s), size: " + skipped);
} }
} }
@ -904,7 +908,8 @@ public class HStore implements HConstants {
HStoreFile compactedOutputFile = new HStoreFile(conf, fs, HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
this.compactionDir, this.info, family.getName(), -1L, null); this.compactionDir, this.info, family.getName(), -1L, null);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + rdrs.size() + " files into " + LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
(references? "(hasReferences=true)": " ") + " into " +
FSUtils.getPath(compactedOutputFile.getMapFilePath())); FSUtils.getPath(compactedOutputFile.getMapFilePath()));
} }
MapFile.Writer writer = compactedOutputFile.getWriter(this.fs, MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
@ -917,15 +922,14 @@ public class HStore implements HConstants {
} }
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap. // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
compactedOutputFile.writeInfo(fs, maxId); compactedOutputFile.writeInfo(fs, maxId, doMajorCompaction);
// Move the compaction into place. // Move the compaction into place.
completeCompaction(filesToCompact, compactedOutputFile); completeCompaction(filesToCompact, compactedOutputFile);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Completed compaction of " + this.storeNameStr + LOG.debug("Completed " + (doMajorCompaction? "major": "") +
" store size is " + StringUtils.humanReadableInt(storeSize) + " compaction of " + this.storeNameStr +
(doMajorCompaction? "": "; time since last major compaction: " + " store size is " + StringUtils.humanReadableInt(storeSize));
(lastMajorCompaction/1000) + " seconds"));
} }
} }
return checkSplit(forceSplit); return checkSplit(forceSplit);
@ -955,19 +959,40 @@ public class HStore implements HConstants {
/* /*
* @return True if we should run a major compaction. * @return True if we should run a major compaction.
*/ */
private boolean isMajorCompaction() throws IOException { boolean isMajorCompaction() throws IOException {
return isMajorCompaction(null);
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
private boolean isMajorCompaction(final List<HStoreFile> filesToCompact)
throws IOException {
boolean result = false; boolean result = false;
Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(), Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
this.family.getName()); this.family.getName());
long lowTimestamp = getLowestTimestamp(fs, mapdir); long lowTimestamp = getLowestTimestamp(fs, mapdir);
if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) && if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
lowTimestamp > 0l) { lowTimestamp > 0l) {
if (LOG.isDebugEnabled()) { // Major compaction time has elapsed.
LOG.debug("Major compaction triggered on store: " + long elapsedTime = System.currentTimeMillis() - lowTimestamp;
this.storeNameStr + ". Time since last major compaction: " + if (filesToCompact != null && filesToCompact.size() == 1 &&
((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds"); filesToCompact.get(0).isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction because only one (major) " +
"compacted file only and elapsedTime " + elapsedTime +
" is < ttl=" + this.ttl);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store: " +
this.storeNameStr + ". Time since last major compaction: " +
((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
}
result = true;
} }
result = true;
} }
return result; return result;
} }
@ -1160,7 +1185,8 @@ public class HStore implements HConstants {
try { try {
// 1. Moving the new MapFile into place. // 1. Moving the new MapFile into place.
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir, HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
this.info, family.getName(), -1, null); this.info, family.getName(), -1, null,
compactedFile.isMajorCompaction());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) + LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath())); " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.io.SequenceFile;
public class HStoreFile implements HConstants { public class HStoreFile implements HConstants {
static final Log LOG = LogFactory.getLog(HStoreFile.class.getName()); static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
static final byte INFO_SEQ_NUM = 0; static final byte INFO_SEQ_NUM = 0;
static final byte MAJOR_COMPACTION = INFO_SEQ_NUM + 1;
static final String HSTORE_DATFILE_DIR = "mapfiles"; static final String HSTORE_DATFILE_DIR = "mapfiles";
static final String HSTORE_INFO_DIR = "info"; static final String HSTORE_INFO_DIR = "info";
static final String HSTORE_FILTER_DIR = "filter"; static final String HSTORE_FILTER_DIR = "filter";
@ -97,6 +98,9 @@ public class HStoreFile implements HConstants {
private final FileSystem fs; private final FileSystem fs;
private final Reference reference; private final Reference reference;
private final HRegionInfo hri; private final HRegionInfo hri;
/* If true, this file was product of a major compaction.
*/
private boolean majorCompaction = false;
/** /**
* Constructor that fully initializes the object * Constructor that fully initializes the object
@ -111,6 +115,24 @@ public class HStoreFile implements HConstants {
HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir, HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
final HRegionInfo hri, byte [] colFamily, long fileId, final HRegionInfo hri, byte [] colFamily, long fileId,
final Reference ref) final Reference ref)
throws IOException {
this(conf, fs, basedir, hri, colFamily, fileId, ref, false);
}
/**
* Constructor that fully initializes the object
* @param conf Configuration object
* @param basedir qualified path that is parent of region directory
* @param colFamily name of the column family
* @param fileId file identifier
* @param ref Reference to another HStoreFile.
* @param hri The region info for this file (HACK HBASE-868). TODO: Fix.
* @param mc Try if this file was result of a major compression.
* @throws IOException
*/
HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
final HRegionInfo hri, byte [] colFamily, long fileId,
final Reference ref, final boolean mc)
throws IOException { throws IOException {
this.conf = conf; this.conf = conf;
this.fs = fs; this.fs = fs;
@ -133,6 +155,7 @@ public class HStoreFile implements HConstants {
// If a reference, construction does not write the pointer files. Thats // If a reference, construction does not write the pointer files. Thats
// done by invocations of writeReferenceFiles(hsf, fs). Happens at split. // done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
this.reference = ref; this.reference = ref;
this.majorCompaction = mc;
} }
/** @return the region name */ /** @return the region name */
@ -288,11 +311,11 @@ public class HStoreFile implements HConstants {
/** /**
* Reads in an info file * Reads in an info file
* *
* @param fs file system * @param filesystem file system
* @return The sequence id contained in the info file * @return The sequence id contained in the info file
* @throws IOException * @throws IOException
*/ */
long loadInfo(FileSystem fs) throws IOException { long loadInfo(final FileSystem filesystem) throws IOException {
Path p = null; Path p = null;
if (isReference()) { if (isReference()) {
p = getInfoFilePath(reference.getEncodedRegionName(), p = getInfoFilePath(reference.getEncodedRegionName(),
@ -300,10 +323,18 @@ public class HStoreFile implements HConstants {
} else { } else {
p = getInfoFilePath(); p = getInfoFilePath();
} }
DataInputStream in = new DataInputStream(fs.open(p)); long length = filesystem.getFileStatus(p).getLen();
boolean hasMoreThanSeqNum = length > (Byte.SIZE + Bytes.SIZEOF_LONG);
DataInputStream in = new DataInputStream(filesystem.open(p));
try { try {
byte flag = in.readByte(); byte flag = in.readByte();
if(flag == INFO_SEQ_NUM) { if (flag == INFO_SEQ_NUM) {
if (hasMoreThanSeqNum) {
flag = in.readByte();
if (flag == MAJOR_COMPACTION) {
this.majorCompaction = in.readBoolean();
}
}
return in.readLong(); return in.readLong();
} }
throw new IOException("Cannot process log file: " + p); throw new IOException("Cannot process log file: " + p);
@ -315,16 +346,37 @@ public class HStoreFile implements HConstants {
/** /**
* Writes the file-identifier to disk * Writes the file-identifier to disk
* *
* @param fs file system * @param filesystem file system
* @param infonum file id * @param infonum file id
* @throws IOException * @throws IOException
*/ */
void writeInfo(FileSystem fs, long infonum) throws IOException { void writeInfo(final FileSystem filesystem, final long infonum)
throws IOException {
writeInfo(filesystem, infonum, false);
}
/**
* Writes the file-identifier to disk
*
* @param filesystem file system
* @param infonum file id
* @param mc True if this file is product of a major compaction
* @throws IOException
*/
void writeInfo(final FileSystem filesystem, final long infonum,
final boolean mc)
throws IOException {
Path p = getInfoFilePath(); Path p = getInfoFilePath();
FSDataOutputStream out = fs.create(p); FSDataOutputStream out = filesystem.create(p);
try { try {
out.writeByte(INFO_SEQ_NUM); out.writeByte(INFO_SEQ_NUM);
out.writeLong(infonum); out.writeLong(infonum);
if (mc) {
// Set whether major compaction flag on this file.
this.majorCompaction = mc;
out.writeByte(MAJOR_COMPACTION);
out.writeBoolean(mc);
}
} finally { } finally {
out.close(); out.close();
} }
@ -431,6 +483,13 @@ public class HStoreFile implements HConstants {
(isReference()? "-" + reference.toString(): ""); (isReference()? "-" + reference.toString(): "");
} }
/**
* @return True if this file was made by a major compaction.
*/
public boolean isMajorCompaction() {
return this.majorCompaction;
}
private static String createHStoreFilename(final long fid, private static String createHStoreFilename(final long fid,
final int encodedRegionName) { final int encodedRegionName) {
return Long.toString(fid) + return Long.toString(fid) +

View File

@ -177,7 +177,7 @@ class MemcacheFlusher extends Thread implements FlushRequester {
// compact if removeFromQueue is true. Note that region.flushCache() // compact if removeFromQueue is true. Note that region.flushCache()
// only returns true if a flush is done and if a compaction is needed. // only returns true if a flush is done and if a compaction is needed.
if (region.flushcache() && !removeFromQueue) { if (region.flushcache() && !removeFromQueue) {
server.compactSplitThread.compactionRequested(region); server.compactSplitThread.compactionRequested(region, getName());
} }
} catch (DroppedSnapshotException ex) { } catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical // Cache flush can fail in a few places. If it fails in a critical

View File

@ -57,4 +57,18 @@ public class Threads {
t.start(); t.start();
return t; return t;
} }
/**
* Shutdown passed thread using isAlive and join.
* @param t Thread to shutdown
*/
public static void shutdown(final Thread t) {
while (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
LOG.warn(t.getName(), e);
}
}
}
} }