HBASE-4463 [jira] Run more aggressive compactions during off peak hours

Summary:
HBASE-4463 Run more aggressive compactions during off peak hours

Increases the compact selection ratio from 1.3 to 5 at off-peak hours. This
will help utilize the available iops and bandwidth to decrease average num of
files per store. Only one such aggressive compaction is queued per store at any
point.

The number of iops on the disk and the top of the rack bandwidth utilization at
off peak hours is much lower than at peak hours depending on the application
usage pattern. We can utilize this knowledge to improve the performance of the
HBase cluster by increasing the compact selection ratio to a much larger value
during off-peak hours than otherwise - increasing hbase.hstore.compaction.ratio
(1.2 default) to hbase.hstore.compaction.ratio.offpeak (5 default). This will
help reduce the average number of files per store.

Test Plan: Started running the unit tests.

Reviewers: JIRA, Kannan, nspiegelberg, mbautin, stack

Reviewed By: nspiegelberg

CC: nspiegelberg, tedyu, lhofhansl, Karthik

Differential Revision: 471

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1208885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
karthik 2011-11-30 23:42:21 +00:00
parent a9ca67a2c5
commit 50de126b79
5 changed files with 275 additions and 50 deletions

View File

@ -1095,7 +1095,7 @@ public class HRegion implements HeapSize { // , Writable{
}
}
LOG.info("Starting compaction on " + cr.getStore() + " in region "
+ this);
+ this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));
doRegionCompactionPrep();
try {
status.setStatus("Compacting store " + cr.getStore());

View File

@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
@ -109,9 +111,6 @@ public class Store extends SchemaConfigured implements HeapSize {
private final int maxFilesToCompact;
private final long minCompactSize;
private final long maxCompactSize;
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
// With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@ -221,7 +220,6 @@ public class Store extends SchemaConfigured implements HeapSize {
this.region.memstoreFlushSize);
this.maxCompactSize
= conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
this.compactionKVMax = conf.getInt("hbase.hstore.compaction.kv.max", 10);
if (Store.closeCheckInterval == 0) {
@ -1040,35 +1038,35 @@ public class Store extends SchemaConfigured implements HeapSize {
override = region.getCoprocessorHost().preCompactSelection(
this, candidates);
}
List<StoreFile> filesToCompact;
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
filesToCompact = candidates;
filesToCompact = new CompactSelection(conf, candidates);
} else {
filesToCompact = compactSelection(candidates);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
ImmutableList.copyOf(filesToCompact));
ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
}
// no files to compact
if (filesToCompact.isEmpty()) {
if (filesToCompact.getFilesToCompact().isEmpty()) {
return null;
}
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact)) {
if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact);
filesCompacting.addAll(filesToCompact.getFilesToCompact());
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.size() == this.storefiles.size());
boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
@ -1089,6 +1087,7 @@ public class Store extends SchemaConfigured implements HeapSize {
}
public void finishRequest(CompactionRequest cr) {
cr.finishRequest();
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
}
@ -1113,7 +1112,7 @@ public class Store extends SchemaConfigured implements HeapSize {
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
List<StoreFile> compactSelection(List<StoreFile> candidates)
CompactSelection compactSelection(List<StoreFile> candidates)
throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function
@ -1128,41 +1127,47 @@ public class Store extends SchemaConfigured implements HeapSize {
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
CompactSelection compactSelection = new CompactSelection(conf, candidates);
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
int pos = 0;
while (pos < filesToCompact.size() &&
filesToCompact.get(pos).getReader().length() > maxCompactSize &&
!filesToCompact.get(pos).isReference()) ++pos;
filesToCompact.subList(0, pos).clear();
while (pos < compactSelection.getFilesToCompact().size() &&
compactSelection.getFilesToCompact().get(pos).getReader().length()
> maxCompactSize &&
!compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
compactSelection.clearSubList(0, pos);
}
if (filesToCompact.isEmpty()) {
if (compactSelection.getFilesToCompact().isEmpty()) {
LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
this.storeNameStr + ": no store files to compact");
return filesToCompact;
compactSelection.emptyFileList();
return compactSelection;
}
// major compact on user action or age (caveat: we have too many files)
boolean majorcompaction = filesToCompact.size() < this.maxFilesToCompact
&& (forcemajor || isMajorCompaction(filesToCompact));
boolean majorcompaction =
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact()))
&& compactSelection.getFilesToCompact().size() < this.maxFilesToCompact;
if (!majorcompaction && !hasReferences(filesToCompact)) {
if (!majorcompaction &&
!hasReferences(compactSelection.getFilesToCompact())) {
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double r = this.compactRatio;
double r = compactSelection.getCompactSelectionRatio();
// skip selection algorithm if we don't have enough files
if (filesToCompact.size() < this.minFilesToCompact) {
return Collections.emptyList();
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
compactSelection.emptyFileList();
return compactSelection;
}
// remove bulk import files that request to be excluded from minors
filesToCompact.removeAll(Collections2.filter(filesToCompact,
compactSelection.getFilesToCompact().removeAll(Collections2.filter(
compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile input) {
return input.excludeFromMinorCompaction();
@ -1175,11 +1180,11 @@ public class Store extends SchemaConfigured implements HeapSize {
*/
// get store file sizes for incremental compacting selection.
int countOfFiles = filesToCompact.size();
int countOfFiles = compactSelection.getFilesToCompact().size();
long [] fileSizes = new long[countOfFiles];
long [] sumSize = new long[countOfFiles];
for (int i = countOfFiles-1; i >= 0; --i) {
StoreFile file = filesToCompact.get(i);
StoreFile file = compactSelection.getFilesToCompact().get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + this.maxFilesToCompact - 1;
@ -1209,26 +1214,28 @@ public class Store extends SchemaConfigured implements HeapSize {
int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
long totalSize = fileSizes[start]
+ ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
filesToCompact = filesToCompact.subList(start, end);
compactSelection = compactSelection.getSubList(start, end);
// if we don't have enough files to compact, just wait
if (filesToCompact.size() < this.minFilesToCompact) {
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of " + this.storeNameStr
+ ". Only " + (end - start) + " file(s) of size "
+ StringUtils.humanReadableInt(totalSize)
+ " have met compaction criteria.");
}
return Collections.emptyList();
compactSelection.emptyFileList();
return compactSelection;
}
} else {
// all files included in this compaction, up to max
if (filesToCompact.size() > this.maxFilesToCompact) {
int pastMax = filesToCompact.size() - this.maxFilesToCompact;
filesToCompact.subList(0, pastMax).clear();
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
int pastMax =
compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
compactSelection.clearSubList(0, pastMax);
}
}
return filesToCompact;
return compactSelection;
}
/**
@ -2017,8 +2024,7 @@ public class Store extends SchemaConfigured implements HeapSize {
public static final long FIXED_OVERHEAD =
ClassSize.align(new SchemaConfigured().heapSize()
+ (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+ (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_BOOLEAN);
+ (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK

View File

@ -0,0 +1,169 @@
/**
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
public class CompactSelection {
private static final long serialVersionUID = 1L;
static final Log LOG = LogFactory.getLog(CompactSelection.class);
// the actual list - this is needed to handle methods like "sublist"
// correctly
List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
// number of off peak compactions either in the compaction queue or
// happening now
public static Integer numOutstandingOffPeakCompactions = 0;
// HBase conf object
Configuration conf;
// was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false;
// compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
// With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
// compaction ratio off-peak
private double compactRatioOffPeak;
// offpeak start time
private int offPeakStartHour = -1;
// off peak end time
private int offPeakEndHour = -1;
public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
this.filesToCompact = filesToCompact;
this.conf = conf;
this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
LOG.warn("Invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
}
/**
* If the current hour falls in the off peak times and there are no
* outstanding off peak compactions, the current compaction is
* promoted to an off peak compaction. Currently only one off peak
* compaction is present in the compaction queue.
*
* @param currentHour
* @return
*/
public double getCompactSelectionRatio() {
double r = this.compactRatio;
synchronized(numOutstandingOffPeakCompactions) {
if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
r = this.compactRatioOffPeak;
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
if(isOffPeakCompaction) {
LOG.info("Running an off-peak compaction, selection ratio = " +
compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
}
return r;
}
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
synchronized(numOutstandingOffPeakCompactions) {
numOutstandingOffPeakCompactions--;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
}
}
public List<StoreFile> getFilesToCompact() {
return filesToCompact;
}
/**
* Removes all files from the current compaction list, and resets off peak
* compactions is set.
*/
public void emptyFileList() {
filesToCompact.clear();
if (isOffPeakCompaction) {
synchronized(numOutstandingOffPeakCompactions) {
// reset the off peak count
numOutstandingOffPeakCompactions--;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
numOutstandingOffPeakCompactions);
}
}
public boolean isOffPeakCompaction() {
return this.isOffPeakCompaction;
}
private boolean isOffPeakHour() {
int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
// If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
}
if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
public CompactSelection subList(int start, int end) {
throw new UnsupportedOperationException();
}
public CompactSelection getSubList(int start, int end) {
filesToCompact = filesToCompact.subList(start, end);
return this;
}
public void clearSubList(int start, int end) {
filesToCompact.subList(start, end).clear();
}
private boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -48,7 +48,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r;
private final Store s;
private final List<StoreFile> files;
private final CompactSelection compactSelection;
private final long totalSize;
private final boolean isMajor;
private int p;
@ -56,15 +56,15 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
private HRegionServer server = null;
public CompactionRequest(HRegion r, Store s,
List<StoreFile> files, boolean isMajor, int p) {
CompactSelection files, boolean isMajor, int p) {
Preconditions.checkNotNull(r);
Preconditions.checkNotNull(files);
this.r = r;
this.s = s;
this.files = files;
this.compactSelection = files;
long sz = 0;
for (StoreFile sf : files) {
for (StoreFile sf : files.getFilesToCompact()) {
sz += sf.getReader().length();
}
this.totalSize = sz;
@ -73,6 +73,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
this.timeInNanos = System.nanoTime();
}
public void finishRequest() {
this.compactSelection.finishRequest();
}
/**
* This function will define where in the priority queue the request will
* end up. Those with the highest priorities will be first. When the
@ -116,9 +120,14 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
return s;
}
/** Gets the compact selection object for the request */
public CompactSelection getCompactSelection() {
return compactSelection;
}
/** Gets the StoreFiles for the request */
public List<StoreFile> getFiles() {
return files;
return compactSelection.getFilesToCompact();
}
/** Gets the total size of all StoreFiles in compaction */
@ -147,7 +156,8 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
@Override
public String toString() {
String fsList = Joiner.on(", ").join(
Collections2.transform(Collections2.filter(files,
Collections2.transform(Collections2.filter(
compactSelection.getFilesToCompact(),
new Predicate<StoreFile>() {
public boolean apply(StoreFile sf) {
return sf.getReader() != null;
@ -160,7 +170,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
return "regionName=" + r.getRegionNameAsString() +
", storeName=" + new String(s.getFamily().getName()) +
", fileCount=" + files.size() +
", fileCount=" + compactSelection.getFilesToCompact().size() +
", fileSize=" + StringUtils.humanReadableInt(totalSize) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
", priority=" + p + ", time=" + timeInNanos;

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.List;
import junit.framework.TestCase;
@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
@ -160,7 +163,7 @@ public class TestCompactSelection extends TestCase {
long ... expected)
throws IOException {
store.forceMajor = forcemajor;
List<StoreFile> actual = store.compactSelection(candidates);
List<StoreFile> actual = store.compactSelection(candidates).getFilesToCompact();
store.forceMajor = false;
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
}
@ -190,7 +193,7 @@ public class TestCompactSelection extends TestCase {
*/
// don't exceed max file compact threshold
assertEquals(maxFiles,
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
/* MAJOR COMPACTION */
// if a major compaction has been forced, then compact everything
@ -202,7 +205,7 @@ public class TestCompactSelection extends TestCase {
// don't exceed max file compact threshold, even with major compaction
store.forceMajor = true;
assertEquals(maxFiles,
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).getFilesToCompact().size());
store.forceMajor = false;
// if we exceed maxCompactSize, downgrade to minor
@ -223,11 +226,48 @@ public class TestCompactSelection extends TestCase {
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
assertEquals(maxFiles,
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size());
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).getFilesToCompact().size());
// empty case
compactEquals(new ArrayList<StoreFile>() /* empty */);
// empty case (because all files are too big)
compactEquals(sfCreate(tooBig, tooBig) /* empty */);
}
public void testOffPeakCompactionRatio() throws IOException {
/*
* NOTE: these tests are specific to describe the implementation of the
* current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this.
*/
long tooBig = maxSize + 1;
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1+24)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
// check compact selection without peak hour setting
LOG.debug("Testing compact selection without off-peak settings...");
compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
// set an off-peak compaction threshold
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// set peak hour to current time and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
}
}