HBASE-8329 Limit compaction speed (binlijin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1501496 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-07-09 19:49:54 +00:00
parent c4b775eb17
commit 1cf2511b30
4 changed files with 221 additions and 0 deletions

View File

@ -58,6 +58,7 @@ public abstract class Compactor {
private int compactionKVMax; private int compactionKVMax;
protected Compression.Algorithm compactionCompression; protected Compression.Algorithm compactionCompression;
private PeakCompactionsThrottle peakCompactionsThrottle;
//TODO: depending on Store is not good but, realistically, all compactors currently do. //TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) { Compactor(final Configuration conf, final Store store) {
@ -66,6 +67,7 @@ public abstract class Compactor {
this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10); this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
this.compactionCompression = (this.store.getFamily() == null) ? this.compactionCompression = (this.store.getFamily() == null) ?
Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression(); Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
this.peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
} }
/** /**
@ -201,6 +203,7 @@ public abstract class Compactor {
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval(); int closeCheckInterval = HStore.getCloseCheckInterval();
boolean hasMore; boolean hasMore;
peakCompactionsThrottle.startCompaction();
do { do {
hasMore = scanner.next(kvs, compactionKVMax); hasMore = scanner.next(kvs, compactionKVMax);
// output to writer: // output to writer:
@ -222,9 +225,12 @@ public abstract class Compactor {
} }
} }
} }
peakCompactionsThrottle.throttle(kv.getLength());
} }
kvs.clear(); kvs.clear();
} while (hasMore); } while (hasMore);
peakCompactionsThrottle.finishCompaction(this.store.getRegionInfo()
.getRegionNameAsString(), this.store.getFamily().getNameAsString());
progress.complete(); progress.complete();
return true; return true;
} }

View File

@ -37,6 +37,13 @@ public abstract class OffPeakHours {
return getInstance(startHour, endHour); return getInstance(startHour, endHour);
} }
public static OffPeakHours getInstance(Configuration conf, String start,
String end) {
int startHour = conf.getInt(start, -1);
int endHour = conf.getInt(end, -1);
return getInstance(startHour, endHour);
}
/** /**
* @param startHour inclusive * @param startHour inclusive
* @param endHour exclusive * @param endHour exclusive

View File

@ -0,0 +1,138 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
/**
* The class used to track peak hours and compactions. peak compaction speed
* should be limit.
*
*/
@InterfaceAudience.Private
public class PeakCompactionsThrottle {
private static final Log LOG = LogFactory.getLog(PeakCompactionsThrottle.class);
public static final String PEAK_START_HOUR = "hbase.peak.start.hour";
public static final String PEAK_END_HOUR = "hbase.peak.end.hour";
public static final String PEAK_COMPACTION_SPEED_ALLOWED =
"hbase.regionserver.compaction.peak.maxspeed";
public static final String PEAK_COMPACTION_SPEED_CHECK_INTERVAL =
"hbase.regionserver.compaction.speed.check.interval";
OffPeakHours offPeakHours;
private long start;
private long end;
private long maxSpeedInPeak;
private int sleepNumber = 0;
private int sleepTimeTotal = 0;
int bytesWritten = 0;
int checkInterval = 0;
public PeakCompactionsThrottle(Configuration conf) {
offPeakHours = OffPeakHours.getInstance(conf, PEAK_START_HOUR, PEAK_END_HOUR);
maxSpeedInPeak = conf.getInt(PEAK_COMPACTION_SPEED_ALLOWED, 30 * 1024 * 1024 /* 30 MB/s */);
checkInterval = conf.getInt(PEAK_COMPACTION_SPEED_CHECK_INTERVAL, 30 * 1024 * 1024 /* 30 MB */);
}
/**
* start compaction
*/
public void startCompaction() {
start = System.currentTimeMillis();
}
/**
* finish compaction
*/
public void finishCompaction(String region, String family) {
if (sleepNumber > 0) {
LOG.info("Region '" + region + "' family '" + family + "' 's maxSpeedInPeak is "
+ StringUtils.humanReadableInt(maxSpeedInPeak) + "/s compaction throttle: sleep number "
+ sleepNumber + " sleep time " + sleepTimeTotal + "(ms)");
}
}
/**
* reset start time
*/
void resetStartTime() {
start = System.currentTimeMillis();
}
/**
* Peak compaction throttle, if it is peak time and the compaction speed is too fast, sleep for a
* while to slow down.
*/
public void throttle(long numOfBytes) throws IOException {
bytesWritten += numOfBytes;
if (bytesWritten >= checkInterval) {
checkAndSlowFastCompact(bytesWritten);
bytesWritten = 0;
}
}
private void checkAndSlowFastCompact(long numOfBytes) throws IOException {
if (!isPeakHour()) {
// not peak hour, just return.
return;
}
if (maxSpeedInPeak <= 0) {
return;
}
end = System.currentTimeMillis();
long minTimeAllowed = numOfBytes * 1000 / maxSpeedInPeak; // ms
long elapsed = end - start;
if (elapsed < minTimeAllowed) {
// too fast
try {
// sleep for a while to slow down.
Thread.sleep(minTimeAllowed - elapsed);
sleepNumber++;
sleepTimeTotal += (minTimeAllowed - elapsed);
} catch (InterruptedException ie) {
IOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
resetStartTime();
}
/**
* @return whether this is peak hour
*/
private boolean isPeakHour() {
return offPeakHours.isOffPeakHour();
}
/**
* For test
*/
public int getSleepNumber() {
return sleepNumber;
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestPeakCompactionsThrottle {
private static HBaseTestingUtility testUtil;
private Configuration conf;
@BeforeClass
public static void setUpClass() {
testUtil = new HBaseTestingUtility();
}
@Before
public void setUp() {
conf = testUtil.getConfiguration();
}
@Test
public void testSetPeakHourToTargetTime() throws IOException {
conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "0");
conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "23");
PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
peakCompactionsThrottle.startCompaction();
long numOfBytes = 60 * 1024 * 1024;
peakCompactionsThrottle.throttle(numOfBytes);
peakCompactionsThrottle.finishCompaction("region", "family");
assertTrue(peakCompactionsThrottle.getSleepNumber() > 0);
}
@Test
public void testSetPeakHourOutsideCurrentSelection() throws IOException {
conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "-1");
conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "-1");
PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
peakCompactionsThrottle.startCompaction();
long numOfBytes = 30 * 1024 * 1024;
peakCompactionsThrottle.throttle(numOfBytes);
peakCompactionsThrottle.finishCompaction("region", "family");
assertTrue(peakCompactionsThrottle.getSleepNumber() == 0);
}
}