HBASE-7822 clean up compactionrequest and compactselection - part 1 (Sergey)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1445696 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-02-13 15:56:18 +00:00
parent a8658d520e
commit c69c94062a
13 changed files with 274 additions and 249 deletions

View File

@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.UnknownProtocolException; import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -294,6 +295,9 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
private boolean isLoadingCfsOnDemandDefault = false; private boolean isLoadingCfsOnDemandDefault = false;
private final AtomicInteger majorInProgress = new AtomicInteger(0);
private final AtomicInteger minorInProgress = new AtomicInteger(0);
/** /**
* @return The smallest mvcc readPoint across all the scanners in this * @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every * region. Writes older than this readPoint, are included in every
@ -5028,7 +5032,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(10 * Bytes.SIZEOF_LONG) + (10 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN); Bytes.SIZEOF_BOOLEAN);
@ -5499,6 +5503,24 @@ public class HRegion implements HeapSize { // , Writable{
return this.openSeqNum; return this.openSeqNum;
} }
/**
* @return if a given region is in compaction now.
*/
public CompactionState getCompactionState() {
boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
: (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
}
public void reportCompactionRequestStart(boolean isMajor){
(isMajor ? majorInProgress : minorInProgress).incrementAndGet();
}
public void reportCompactionRequestEnd(boolean isMajor){
int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
assert newValue >= 0;
}
/** /**
* Listener class to enable callers of * Listener class to enable callers of
* bulkLoadHFile() to perform any necessary * bulkLoadHFile() to perform any necessary

View File

@ -3303,8 +3303,7 @@ public class HRegionServer implements ClientProtocol,
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(HRegionInfo.convert(info)); builder.setRegionInfo(HRegionInfo.convert(info));
if (request.hasCompactionState() && request.getCompactionState()) { if (request.hasCompactionState() && request.getCompactionState()) {
builder.setCompactionState( builder.setCompactionState(region.getCompactionState());
CompactionRequest.getCompactionState(info.getRegionId()));
} }
return builder.build(); return builder.build();
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -153,6 +154,8 @@ public class HStore implements Store {
private final KeyValue.KVComparator comparator; private final KeyValue.KVComparator comparator;
private Compactor compactor; private Compactor compactor;
private OffPeakCompactions offPeakCompactions;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private static int flush_retries_number; private static int flush_retries_number;
@ -207,11 +210,11 @@ public class HStore implements Store {
// to clone it? // to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator); this.memstore = new MemStore(conf, this.comparator);
this.offPeakCompactions = new OffPeakCompactions(conf);
// Setting up cache configuration for this family // Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family); this.cacheConf = new CacheConfig(conf, family);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) { if (HStore.closeCheckInterval == 0) {
@ -1242,8 +1245,13 @@ public class HStore implements Store {
filesToCompact = new CompactSelection(candidates); filesToCompact = new CompactSelection(candidates);
} else { } else {
boolean isUserCompaction = priority == Store.PRIORITY_USER; boolean isUserCompaction = priority == Store.PRIORITY_USER;
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction, filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
forceMajor && filesCompacting.isEmpty()); mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
// Compaction policy doesn't want to do anything with off-peak.
this.offPeakCompactions.endOffPeakRequest();
}
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -1284,14 +1292,17 @@ public class HStore implements Store {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
if (ret != null) { if (ret != null) {
CompactionRequest.preRequest(ret); this.region.reportCompactionRequestStart(ret.isMajor());
} }
return ret; return ret;
} }
public void finishRequest(CompactionRequest cr) { public void finishRequest(CompactionRequest cr) {
CompactionRequest.postRequest(cr); this.region.reportCompactionRequestEnd(cr.isMajor());
cr.finishRequest(); if (cr.getCompactSelection().isOffPeakCompaction()) {
this.offPeakCompactions.endOffPeakRequest();
cr.getCompactSelection().setOffPeak(false);
}
synchronized (filesCompacting) { synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles()); filesCompacting.removeAll(cr.getFiles());
} }
@ -1868,7 +1879,7 @@ public class HStore implements Store {
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) ClassSize.align((21 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+ (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD

View File

@ -31,21 +31,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class CompactSelection { public class CompactSelection {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
static final Log LOG = LogFactory.getLog(CompactSelection.class); static final Log LOG = LogFactory.getLog(CompactSelection.class);
// the actual list - this is needed to handle methods like "sublist" // the actual list - this is needed to handle methods like "sublist" correctly
// correctly
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
/**
* Number of off peak compactions either in the compaction queue or
* happening now. Please lock compactionCountLock before modifying.
*/
static long numOutstandingOffPeakCompactions = 0;
/**
* Lock object for numOutstandingOffPeakCompactions
*/
private final static Object compactionCountLock = new Object();
// was this compaction promoted to an off-peak // was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false; boolean isOffPeakCompaction = false;
// CompactSelection object creation time. // CompactSelection object creation time.
@ -57,23 +44,6 @@ public class CompactSelection {
this.isOffPeakCompaction = false; this.isOffPeakCompaction = false;
} }
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
newValueToLog);
}
}
public List<StoreFile> getFilesToCompact() { public List<StoreFile> getFilesToCompact() {
return filesToCompact; return filesToCompact;
} }
@ -84,42 +54,14 @@ public class CompactSelection {
*/ */
public void emptyFileList() { public void emptyFileList() {
filesToCompact.clear(); filesToCompact.clear();
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
// reset the off peak count
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
newValueToLog);
}
} }
public boolean isOffPeakCompaction() { public boolean isOffPeakCompaction() {
return this.isOffPeakCompaction; return this.isOffPeakCompaction;
} }
public static long getNumOutStandingOffPeakCompactions() { public void setOffPeak(boolean value) {
synchronized(compactionCountLock) { this.isOffPeakCompaction = value;
return numOutstandingOffPeakCompactions;
}
}
/**
* Tries making the compaction off-peak.
* Only checks internal compaction constraints, not timing.
* @return Eventual value of isOffPeakCompaction.
*/
public boolean trySetOffpeak() {
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
synchronized(compactionCountLock) {
if (numOutstandingOffPeakCompactions == 0) {
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
return isOffPeakCompaction;
} }
public long getSelectionTime() { public long getSelectionTime() {

View File

@ -58,8 +58,6 @@ public class CompactionConfiguration {
int maxFilesToCompact; int maxFilesToCompact;
double compactionRatio; double compactionRatio;
double offPeekCompactionRatio; double offPeekCompactionRatio;
int offPeakStartHour;
int offPeakEndHour;
long throttlePoint; long throttlePoint;
boolean shouldDeleteExpired; boolean shouldDeleteExpired;
long majorCompactionPeriod; long majorCompactionPeriod;
@ -78,17 +76,6 @@ public class CompactionConfiguration {
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10); maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F); compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F); offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize()); 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
@ -104,16 +91,14 @@ public class CompactionConfiguration {
@Override @Override
public String toString() { public String toString() {
return String.format( return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
+ "throttle point %d;%s delete expired; major period %d, major jitter %f", + "%s delete expired; major period %d, major jitter %f",
minCompactSize, minCompactSize,
maxCompactSize, maxCompactSize,
minFilesToCompact, minFilesToCompact,
maxFilesToCompact, maxFilesToCompact,
compactionRatio, compactionRatio,
offPeekCompactionRatio, offPeekCompactionRatio,
offPeakStartHour,
offPeakEndHour,
throttlePoint, throttlePoint,
shouldDeleteExpired ? "" : " don't", shouldDeleteExpired ? "" : " don't",
majorCompactionPeriod, majorCompactionPeriod,
@ -169,20 +154,6 @@ public class CompactionConfiguration {
return offPeekCompactionRatio; return offPeekCompactionRatio;
} }
/**
* @return Hour at which off-peak compactions start
*/
int getOffPeakStartHour() {
return offPeakStartHour;
}
/**
* @return Hour at which off-peak compactions end
*/
int getOffPeakEndHour() {
return offPeakEndHour;
}
/** /**
* @return ThrottlePoint used for classifying small and large compactions * @return ThrottlePoint used for classifying small and large compactions
*/ */
@ -212,8 +183,4 @@ public class CompactionConfiguration {
boolean shouldDeleteExpired() { boolean shouldDeleteExpired() {
return shouldDeleteExpired; return shouldDeleteExpired;
} }
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
} }

View File

@ -70,7 +70,7 @@ public abstract class CompactionPolicy extends Configured {
*/ */
public abstract CompactSelection selectCompaction( public abstract CompactSelection selectCompaction(
final List<StoreFile> candidateFiles, final boolean isUserCompaction, final List<StoreFile> candidateFiles, final boolean isUserCompaction,
final boolean forceMajor) throws IOException; final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
/** /**
* @param storeFiles Store files in the store. * @param storeFiles Store files in the store.

View File

@ -50,30 +50,22 @@ import com.google.common.collect.Collections2;
public class CompactionRequest implements Comparable<CompactionRequest>, public class CompactionRequest implements Comparable<CompactionRequest>,
Runnable { Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class); static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r; private final HRegion region;
private final HStore s; private final HStore store;
private final CompactSelection compactSelection; private final CompactSelection compactSelection;
private final long totalSize; private final long totalSize;
private final boolean isMajor; private final boolean isMajor;
private int p; private int priority;
private final Long timeInNanos; private final Long timeInNanos;
private HRegionServer server = null; private HRegionServer server = null;
/** public CompactionRequest(HRegion region, HStore store,
* Map to track the number of compactions requested per region (id) CompactSelection files, boolean isMajor, int priority) {
*/ Preconditions.checkNotNull(region);
private static final ConcurrentHashMap<Long, AtomicInteger>
majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
private static final ConcurrentHashMap<Long, AtomicInteger>
minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
public CompactionRequest(HRegion r, HStore s,
CompactSelection files, boolean isMajor, int p) {
Preconditions.checkNotNull(r);
Preconditions.checkNotNull(files); Preconditions.checkNotNull(files);
this.r = r; this.region = region;
this.s = s; this.store = store;
this.compactSelection = files; this.compactSelection = files;
long sz = 0; long sz = 0;
for (StoreFile sf : files.getFilesToCompact()) { for (StoreFile sf : files.getFilesToCompact()) {
@ -81,66 +73,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
this.totalSize = sz; this.totalSize = sz;
this.isMajor = isMajor; this.isMajor = isMajor;
this.p = p; this.priority = priority;
this.timeInNanos = System.nanoTime(); this.timeInNanos = System.nanoTime();
} }
/**
* Find out if a given region is in compaction now.
*
* @param regionId
* @return a CompactionState
*/
public static CompactionState getCompactionState(
final long regionId) {
Long key = Long.valueOf(regionId);
AtomicInteger major = majorCompactions.get(key);
AtomicInteger minor = minorCompactions.get(key);
int state = 0;
if (minor != null && minor.get() > 0) {
state += 1; // use 1 to indicate minor here
}
if (major != null && major.get() > 0) {
state += 2; // use 2 to indicate major here
}
switch (state) {
case 3: // 3 = 2 + 1, so both major and minor
return CompactionState.MAJOR_AND_MINOR;
case 2:
return CompactionState.MAJOR;
case 1:
return CompactionState.MINOR;
default:
return CompactionState.NONE;
}
}
public static void preRequest(final CompactionRequest cr){
Long key = Long.valueOf(cr.getHRegion().getRegionId());
ConcurrentHashMap<Long, AtomicInteger> compactions =
cr.isMajor() ? majorCompactions : minorCompactions;
AtomicInteger count = compactions.get(key);
if (count == null) {
compactions.putIfAbsent(key, new AtomicInteger(0));
count = compactions.get(key);
}
count.incrementAndGet();
}
public static void postRequest(final CompactionRequest cr){
Long key = Long.valueOf(cr.getHRegion().getRegionId());
ConcurrentHashMap<Long, AtomicInteger> compactions =
cr.isMajor() ? majorCompactions : minorCompactions;
AtomicInteger count = compactions.get(key);
if (count != null) {
count.decrementAndGet();
}
}
public void finishRequest() {
this.compactSelection.finishRequest();
}
/** /**
* This function will define where in the priority queue the request will * This function will define where in the priority queue the request will
* end up. Those with the highest priorities will be first. When the * end up. Those with the highest priorities will be first. When the
@ -160,7 +96,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
int compareVal; int compareVal;
compareVal = p - request.p; //compare priority compareVal = priority - request.priority; //compare priority
if (compareVal != 0) { if (compareVal != 0) {
return compareVal; return compareVal;
} }
@ -181,12 +117,12 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the HRegion for the request */ /** Gets the HRegion for the request */
public HRegion getHRegion() { public HRegion getHRegion() {
return r; return region;
} }
/** Gets the Store for the request */ /** Gets the Store for the request */
public HStore getStore() { public HStore getStore() {
return s; return store;
} }
/** Gets the compact selection object for the request */ /** Gets the compact selection object for the request */
@ -210,7 +146,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the priority for the request */ /** Gets the priority for the request */
public int getPriority() { public int getPriority() {
return p; return priority;
} }
public long getSelectionTime() { public long getSelectionTime() {
@ -219,7 +155,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the priority for the request */ /** Gets the priority for the request */
public void setPriority(int p) { public void setPriority(int p) {
this.p = p; this.priority = p;
} }
public void setServer(HRegionServer hrs) { public void setServer(HRegionServer hrs) {
@ -241,12 +177,12 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
})); }));
return "regionName=" + r.getRegionNameAsString() + return "regionName=" + region.getRegionNameAsString() +
", storeName=" + new String(s.getFamily().getName()) + ", storeName=" + new String(store.getFamily().getName()) +
", fileCount=" + compactSelection.getFilesToCompact().size() + ", fileCount=" + compactSelection.getFilesToCompact().size() +
", fileSize=" + StringUtils.humanReadableInt(totalSize) + ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
", priority=" + p + ", time=" + timeInNanos; ", priority=" + priority + ", time=" + timeInNanos;
} }
@Override @Override
@ -257,18 +193,18 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
try { try {
long start = EnvironmentEdgeManager.currentTimeMillis(); long start = EnvironmentEdgeManager.currentTimeMillis();
boolean completed = r.compact(this); boolean completed = region.compact(this);
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start)); this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) { if (completed) {
// degenerate case: blocked regions require recursive enqueues // degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() <= 0) { if (store.getCompactPriority() <= 0) {
server.compactSplitThread server.compactSplitThread
.requestCompaction(r, s, "Recursive enqueue"); .requestCompaction(region, store, "Recursive enqueue");
} else { } else {
// see if the compaction has caused us to exceed max region size // see if the compaction has caused us to exceed max region size
server.compactSplitThread.requestSplit(r); server.compactSplitThread.requestSplit(region);
} }
} }
} catch (IOException ex) { } catch (IOException ex) {
@ -279,7 +215,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
LOG.error("Compaction failed " + this, ex); LOG.error("Compaction failed " + this, ex);
server.checkFileSystem(); server.checkFileSystem();
} finally { } finally {
s.finishRequest(this); store.finishRequest(this);
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
} }
} }

View File

@ -48,7 +48,6 @@ import com.google.common.collect.Collections2;
public class DefaultCompactionPolicy extends CompactionPolicy { public class DefaultCompactionPolicy extends CompactionPolicy {
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class); private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
private final static Calendar calendar = new GregorianCalendar();
public DefaultCompactionPolicy() { public DefaultCompactionPolicy() {
compactor = new DefaultCompactor(this); compactor = new DefaultCompactor(this);
@ -80,7 +79,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
* @throws java.io.IOException * @throws java.io.IOException
*/ */
public CompactSelection selectCompaction(List<StoreFile> candidateFiles, public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
boolean isUserCompaction, boolean forceMajor) final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
throws IOException { throws IOException {
// Preliminary compaction subject to filters // Preliminary compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles); CompactSelection candidateSelection = new CompactSelection(candidateFiles);
@ -110,6 +109,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
if (!majorCompaction) { if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable // we're doing a minor compaction, let's see what files are applicable
candidateSelection.setOffPeak(mayUseOffPeak);
candidateSelection = filterBulk(candidateSelection); candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection); candidateSelection = checkMinFilesCriteria(candidateSelection);
@ -232,6 +232,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
" files ready for compaction. Need " + minFiles + " to initiate."); " files ready for compaction. Need " + minFiles + " to initiate.");
} }
candidates.emptyFileList(); candidates.emptyFileList();
candidates.setOffPeak(false);
} }
return candidates; return candidates;
} }
@ -274,11 +275,9 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
// we're doing a minor compaction, let's see what files are applicable // we're doing a minor compaction, let's see what files are applicable
int start = 0; int start = 0;
double ratio = comConf.getCompactionRatio(); double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) { if (candidates.isOffPeakCompaction()) {
ratio = comConf.getCompactionRatioOffPeak(); ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
} }
// get store file sizes for incremental compacting selection. // get store file sizes for incremental compacting selection.
@ -394,21 +393,4 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
int numCandidates = storeFiles.size() - filesCompacting.size(); int numCandidates = storeFiles.size() - filesCompacting.size();
return numCandidates > comConf.getMinFilesToCompact(); return numCandidates > comConf.getMinFilesToCompact();
} }
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
}
} }

View File

@ -0,0 +1,111 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* The class used to track off-peak hours and compactions. Off-peak compaction counter
* is global for the entire server, hours can be different per instance of this class,
* based on the configuration of the corresponding store.
*/
@InterfaceAudience.Private
public class OffPeakCompactions {
private static final Log LOG = LogFactory.getLog(OffPeakCompactions.class);
private final static Calendar calendar = new GregorianCalendar();
private int offPeakStartHour;
private int offPeakEndHour;
// TODO: replace with AtomicLong, see HBASE-7437.
/**
* Number of off peak compactions either in the compaction queue or
* happening now. Please lock compactionCountLock before modifying.
*/
private static long numOutstanding = 0;
/**
* Lock object for numOutstandingOffPeakCompactions
*/
private static final Object compactionCountLock = new Object();
public OffPeakCompactions(Configuration conf) {
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
}
/**
* Tries making the compaction off-peak.
* @return Whether the compaction can be made off-peak.
*/
public boolean tryStartOffPeakRequest() {
if (!isOffPeakHour()) return false;
synchronized(compactionCountLock) {
if (numOutstanding == 0) {
numOutstanding++;
return true;
}
}
return false;
}
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void endOffPeakRequest() {
long newValueToLog = -1;
synchronized(compactionCountLock) {
newValueToLog = --numOutstanding;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + newValueToLog);
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
// If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
}
if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -163,7 +163,7 @@ public class TestCompactionState {
// otherwise, the compaction should have already been done // otherwise, the compaction should have already been done
if (expectedState != state) { if (expectedState != state) {
for (HRegion region: regions) { for (HRegion region: regions) {
state = CompactionRequest.getCompactionState(region.getRegionId()); state = region.getCompactionState();
assertEquals(CompactionState.NONE, state); assertEquals(CompactionState.NONE, state);
} }
} else { } else {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -220,16 +221,25 @@ public class TestDefaultCompactSelection extends TestCase {
void compactEquals(List<StoreFile> candidates, long... expected) void compactEquals(List<StoreFile> candidates, long... expected)
throws IOException { throws IOException {
compactEquals(candidates, false, expected); compactEquals(candidates, false, false, expected);
} }
void compactEquals(List<StoreFile> candidates, boolean forcemajor, void compactEquals(List<StoreFile> candidates, boolean forcemajor, long... expected)
throws IOException {
compactEquals(candidates, forcemajor, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, boolean isOffPeak,
long ... expected) long ... expected)
throws IOException { throws IOException {
store.forceMajor = forcemajor; store.forceMajor = forcemajor;
//Test Default compactions //Test Default compactions
List<StoreFile> actual = store.compactionPolicy CompactSelection result = store.compactionPolicy
.selectCompaction(candidates, false, forcemajor).getFilesToCompact(); .selectCompaction(candidates, false, isOffPeak, forcemajor);
List<StoreFile> actual = result.getFilesToCompact();
if (isOffPeak && !forcemajor) {
assertTrue(result.isOffPeakCompaction());
}
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false; store.forceMajor = false;
} }
@ -309,36 +319,11 @@ public class TestDefaultCompactSelection extends TestCase {
* current compaction algorithm. Developed to ensure that refactoring * current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this. * doesn't implicitly alter this.
*/ */
//long tooBig = maxSize + 1;
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
// check compact selection without peak hour setting
LOG.debug("Testing compact selection without off-peak settings...");
compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
// set an off-peak compaction threshold // set an off-peak compaction threshold
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F); this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// set peak hour to current time and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
store.compactionPolicy.updateConfiguration(); store.compactionPolicy.updateConfiguration();
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1); // Test with and without the flag.
compactEquals(sfCreate(999, 50, 12, 12, 1), false, true, 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
store.compactionPolicy.updateConfiguration();
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
} }
} }

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestOffPeakCompactions {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test
public void testOffPeakHours() throws IOException {
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
Configuration conf = TEST_UTIL.getConfiguration();
OffPeakCompactions opc = new OffPeakCompactions(conf);
LOG.debug("Testing without off-peak settings...");
assertFalse(opc.tryStartOffPeakRequest());
// set peak hour to current time and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
opc = new OffPeakCompactions(conf);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
assertTrue(opc.tryStartOffPeakRequest());
opc.endOffPeakRequest();
// set peak hour outside current selection and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
opc = new OffPeakCompactions(conf);
assertFalse(opc.tryStartOffPeakRequest());
}
}

View File

@ -170,7 +170,7 @@ public class PerfTestCompactionPolicies {
private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws IOException { private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws IOException {
List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles); List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles);
CompactSelection sel = cp.selectCompaction(storeFiles, false, false); CompactSelection sel = cp.selectCompaction(storeFiles, false, false, false);
int newFileSize = 0; int newFileSize = 0;
List<StoreFile> filesToCompact = sel.getFilesToCompact(); List<StoreFile> filesToCompact = sel.getFilesToCompact();